Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ethdb/pebble: prevent shutdown-panic #27238

Merged
merged 4 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions ethdb/dbtest/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,32 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
}
}
})

t.Run("OperatonsAfterClose", func(t *testing.T) {
db := New()
db.Put([]byte("key"), []byte("value"))
db.Close()
if _, err := db.Get([]byte("key")); err == nil {
t.Fatalf("expected error on Get after Close")
}
if _, err := db.Has([]byte("key")); err == nil {
t.Fatalf("expected error on Get after Close")
}
if err := db.Put([]byte("key2"), []byte("value2")); err == nil {
t.Fatalf("expected error on Put after Close")
}
if err := db.Delete([]byte("key")); err == nil {
t.Fatalf("expected error on Delete after Close")
}

b := db.NewBatch()
if err := b.Put([]byte("batchkey"), []byte("batchval")); err != nil {
t.Fatalf("expected no error on batch.Put after Close, got %v", err)
}
if err := b.Write(); err == nil {
t.Fatalf("expected error on batch.Write after Close")
}
})
}

// BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database
Expand Down
3 changes: 3 additions & 0 deletions ethdb/memorydb/memorydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (b *batch) Write() error {
b.db.lock.Lock()
defer b.db.lock.Unlock()

if b.db.db == nil {
return errMemorydbClosed
}
for _, keyvalue := range b.writes {
if keyvalue.delete {
delete(b.db.db, string(keyvalue.key))
Expand Down
49 changes: 39 additions & 10 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ type Database struct {
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated

quitLock sync.Mutex // Mutex protecting the quit channel access
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed

log log.Logger // Contextual logger tracking the database path

Expand Down Expand Up @@ -221,23 +222,29 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
func (d *Database) Close() error {
d.quitLock.Lock()
defer d.quitLock.Unlock()

// Allow double closing, simplifies things
if d.quitChan == nil {
if d.closed {
return nil
}
errc := make(chan error)
d.quitChan <- errc
if err := <-errc; err != nil {
d.log.Error("Metrics collection failed", "err", err)
d.closed = true
if d.quitChan != nil {
errc := make(chan error)
d.quitChan <- errc
if err := <-errc; err != nil {
d.log.Error("Metrics collection failed", "err", err)
}
d.quitChan = nil
}
d.quitChan = nil

return d.db.Close()
}

// Has retrieves if a key is present in the key-value store.
func (d *Database) Has(key []byte) (bool, error) {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return false, pebble.ErrClosed
}
_, closer, err := d.db.Get(key)
if err == pebble.ErrNotFound {
return false, nil
Expand All @@ -250,6 +257,11 @@ func (d *Database) Has(key []byte) (bool, error) {

// Get retrieves the given key if it's present in the key-value store.
func (d *Database) Get(key []byte) ([]byte, error) {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return nil, pebble.ErrClosed
}
dat, closer, err := d.db.Get(key)
if err != nil {
return nil, err
Expand All @@ -262,19 +274,30 @@ func (d *Database) Get(key []byte) ([]byte, error) {

// Put inserts the given value into the key-value store.
func (d *Database) Put(key []byte, value []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.Set(key, value, pebble.NoSync)
}

// Delete removes the key from the key-value store.
func (d *Database) Delete(key []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.Delete(key, nil)
}

// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (d *Database) NewBatch() ethdb.Batch {
return &batch{
b: d.db.NewBatch(),
b: d.db.NewBatch(),
db: d,
}
}

Expand Down Expand Up @@ -481,6 +504,7 @@ func (d *Database) meter(refresh time.Duration) {
// when Write is called. A batch cannot be used concurrently.
type batch struct {
b *pebble.Batch
db *Database
size int
}

Expand All @@ -505,6 +529,11 @@ func (b *batch) ValueSize() int {

// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
b.db.quitLock.RLock()
defer b.db.quitLock.RUnlock()
if b.db.closed {
return pebble.ErrClosed
}
return b.b.Commit(pebble.NoSync)
}

Expand Down