Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

triedb/pathdb, eth: use double-buffer mechanism in pathdb #30464

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ func (t *testHelper) Commit() common.Hash {
}
t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, t.states)
t.triedb.Commit(root, false)

// re-open the trie database to ensure the frozen buffer
// is not referenced
config := &triedb.Config{}
if t.triedb.Scheme() == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}
t.triedb = triedb.NewDatabase(t.triedb.Disk(), config)
return root
}

Expand Down
47 changes: 31 additions & 16 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,19 +978,22 @@ func TestMissingTrieNodes(t *testing.T) {
func testMissingTrieNodes(t *testing.T, scheme string) {
// Create an initial state with a few accounts
var (
tdb *triedb.Database
memDb = rawdb.NewMemoryDatabase()
tdb *triedb.Database
memDb = rawdb.NewMemoryDatabase()
openDb = func() *triedb.Database {
if scheme == rawdb.PathScheme {
return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {
return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
CleanCacheSize: 0,
}}) // disable caching
}
}
)
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
CleanCacheSize: 0,
}}) // disable caching
}
tdb = openDb()
db := NewDatabase(tdb, nil)

var root common.Hash
Expand All @@ -1008,17 +1011,29 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
tdb.Commit(root, false)
}
// Create a new state on the old root
state, _ = New(root, db)
// Now we clear out the memdb
it := memDb.NewIterator(nil, nil)
for it.Next() {
k := it.Key()

// Leave the root intact
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
if scheme == rawdb.HashScheme {
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
if scheme == rawdb.PathScheme {
rk := k[len(rawdb.TrieNodeAccountPrefix):]
if len(rk) != 0 {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
}
tdb = openDb()
db = NewDatabase(tdb, nil)
state, _ = New(root, db)
balance := state.GetBalance(addr)
// The removed elem should lead to it returning zero balance
if exp, got := uint64(0), balance.Uint64(); got != exp {
Expand Down
85 changes: 57 additions & 28 deletions triedb/pathdb/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package pathdb

import (
"errors"
"fmt"
"time"

Expand All @@ -37,6 +38,9 @@ type buffer struct {
limit uint64 // The maximum memory allowance in bytes
nodes *nodeSet // Aggregated trie node set
states *stateSet // Aggregated state set

done chan struct{} // notifier whether the content in buffer has been flushed or not
flushErr error // error if any exception occurs during flushing
}

// newBuffer initializes the buffer with the provided states and trie nodes.
Expand Down Expand Up @@ -124,36 +128,61 @@ func (b *buffer) size() uint64 {

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) {
if b.done != nil {
panic("duplicated flush operation")
}
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if freezer != nil {
if err := freezer.Sync(); err != nil {
return err
b.done = make(chan struct{})

go func() {
defer func() {
close(b.done)
}()

// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
return
}
}
nodes := b.nodes.write(batch, nodesCache)
rawdb.WritePersistentStateID(batch, id)
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if freezer != nil {
if err := freezer.Sync(); err != nil {
b.flushErr = err
return
}
}
nodes := b.nodes.write(batch, nodesCache)
rawdb.WritePersistentStateID(batch, id)

// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
b.flushErr = err
return
}
// The content in the frozen buffer is kept for consequent state access,
// TODO (rjl493456442) measure the gc overhead for holding this struct.

commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitTimeTimer.UpdateSince(start)
log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
}()
}

// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
return err
// waitFlush blocks until the buffer has been fully flushed and returns any
// stored errors that occurred during the process.
func (b *buffer) waitFlush() error {
if b.done == nil {
return errors.New("the buffer is not frozen")
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitTimeTimer.UpdateSince(start)
b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
<-b.done
return b.flushErr
}
10 changes: 8 additions & 2 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (db *Database) Enable(root common.Hash) error {
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)))
db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil))

// Re-enable the database as the final step.
db.waitSync = false
Expand Down Expand Up @@ -476,7 +476,13 @@ func (db *Database) Close() error {
db.readOnly = true

// Release the memory held by clean cache.
db.tree.bottom().resetCache()
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.waitFlush(); err != nil {
return err
}
}
disk.resetCache()

// Close the attached state history freezer.
if db.freezer == nil {
Expand Down
4 changes: 2 additions & 2 deletions triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes *no
}

// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
Expand All @@ -183,7 +183,7 @@ func (dl *diffLayer) size() uint64 {

// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))
Expand Down
Loading