Skip to content

feat(share): Periodic GC over EDSStore #1359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nodebuilder/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Module interface {
// BandwidthForPeer returns a Stats struct with bandwidth metrics associated with the given peer.ID.
// The metrics returned include all traffic sent / received for the peer, regardless of protocol.
BandwidthForPeer(id peer.ID) metrics.Stats
// BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given protocol.ID.
// BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given
// protocol.ID.
BandwidthForProtocol(proto protocol.ID) metrics.Stats

// ResourceState returns the state of the resource manager.
Expand Down
52 changes: 44 additions & 8 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"os"
"sync/atomic"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/index"
Expand All @@ -21,20 +23,27 @@ const (
blocksPath = "/blocks/"
indexPath = "/index/"
transientsPath = "/transients/"

defaultGCInterval = time.Hour
)

// Store maintains (via DAGStore) a top-level index enabling granular and efficient random access to
// every share and/or Merkle proof over every registered CARv1 file. The EDSStore provides a custom
// Blockstore interface implementation to achieve access. The main use-case is randomized sampling
// over the whole chain of EDS block data and getting data by namespace.
type Store struct {
cancel context.CancelFunc

dgstr *dagstore.DAGStore
mounts *mount.Registry

topIdx index.Inverted
carIdx index.FullIndexRepo

basepath string
basepath string
gcInterval time.Duration
// lastGCResult is only stored on the store for testing purposes.
lastGCResult atomic.Pointer[dagstore.GCResult]
}

// NewStore creates a new EDS Store under the given basepath and datastore.
Expand Down Expand Up @@ -70,25 +79,52 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
}

return &Store{
basepath: basepath,
dgstr: dagStore,
topIdx: invertedRepo,
carIdx: fsRepo,
mounts: r,
basepath: basepath,
dgstr: dagStore,
topIdx: invertedRepo,
carIdx: fsRepo,
gcInterval: defaultGCInterval,
mounts: r,
}, nil
}

// Start starts the underlying DAGStore.
func (s *Store) Start(context.Context) error {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel

go s.gc(ctx)
return s.dgstr.Start(ctx)
}

// Stop stops the underlying DAGStore.
func (s *Store) Stop(context.Context) error {
defer s.cancel()
return s.dgstr.Close()
}

// gc periodically removes all inactive or errored shards.
func (s *Store) gc(ctx context.Context) {
ticker := time.NewTicker(s.gcInterval)
// initialize empty gc result to avoid panic on access
s.lastGCResult.Store(&dagstore.GCResult{
Shards: make(map[shard.Key]error),
})
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
res, err := s.dgstr.GC(ctx)
if err != nil {
log.Errorf("garbage collecting dagstore: %v", err)
return
}
s.lastGCResult.Store(res)
}

}
}

// Put stores the given data square with DataRoot's hash as a key.
//
// The square is verified on the Exchange level, and Put only stores the square, trusting it.
Expand Down
39 changes: 37 additions & 2 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-car"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -156,6 +156,41 @@ func TestEDSStore_Has(t *testing.T) {
assert.True(t, ok)
}

// TestEDSStore_GC verifies that unused transient shards are collected by the GC periodically.
func TestEDSStore_GC(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

edsStore, err := newStore(t)
edsStore.gcInterval = time.Second
require.NoError(t, err)

// kicks off the gc goroutine
err = edsStore.Start(ctx)
require.NoError(t, err)

eds, dah := randomEDS(t)
shardKey := shard.KeyFromString(dah.String())

err = edsStore.Put(ctx, dah, eds)
require.NoError(t, err)

// doesn't exist yet
assert.NotContains(t, edsStore.lastGCResult.Load().Shards, shardKey)

// wait for gc to run, retry three times
for i := 0; i < 3; i++ {
time.Sleep(edsStore.gcInterval)
if _, ok := edsStore.lastGCResult.Load().Shards[shardKey]; ok {
break
}
}
assert.Contains(t, edsStore.lastGCResult.Load().Shards, shardKey)

// assert nil in this context means there was no error re-acquiring the shard during GC
assert.Nil(t, edsStore.lastGCResult.Load().Shards[shardKey])
}

func newStore(t *testing.T) (*Store, error) {
t.Helper()

Expand Down