Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config for Automated GC and tests for this feature #133

Open
wants to merge 1 commit into
base: feat/abstract-gc-api
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestMmap(t *testing.T) {
tempdir := t.TempDir()

var err error
mnt, err = mount.Upgrade(mnt, throttle.Noop(), tempdir, "foo", "")
mnt, err = mount.Upgrade(mnt, throttle.Noop(), tempdir, "foo", "", &mount.SimpleDownloader{})
require.NoError(t, err)

// warm up the upgrader so a transient is created, and we can obtain
Expand Down
185 changes: 177 additions & 8 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/filecoin-project/dagstore/gc"

mh "github.com/multiformats/go-multihash"

carindex "github.com/ipld/go-car/v2/index"
Expand Down Expand Up @@ -60,6 +63,10 @@ var (
// ErrShardInitializationFailed is returned when shard initialization fails.
ErrShardInitializationFailed = errors.New("shard initialization failed")

// ErrShardIllegalReservationRequest is returned when we get a reservation request for a shard
// that is not in a valid state for reservation requests.
ErrShardIllegalReservationRequest = errors.New("illegal shard reservation request")

// ErrShardInUse is returned when the user attempts to destroy a shard that
// is in use.
ErrShardInUse = errors.New("shard in use")
Expand Down Expand Up @@ -97,6 +104,11 @@ type DAGStore struct {
// gcCh is where requests for GC are sent.
gcCh chan chan *GCResult

// Channels not owned by us.
//
// automatedgcTraceCh is where the Automated GC trace will be sent, if channel is non-nil.
automatedgcTraceCh chan AutomatedGCResult

// Channels not owned by us.
//
// traceCh is where traces on shard operations will be sent, if non-nil.
Expand All @@ -114,6 +126,21 @@ type DAGStore struct {
ctx context.Context
cancelFn context.CancelFunc
wg sync.WaitGroup

// Automated GC state
//---
// counter tracking the size of the transient directory along with reservations; guarded by the event loop
totalTransientDirSize int64
//
// The garbage collector strategy we are using; calls to this should only be made from the event loop
gcs gc.GarbageCollectionStrategy
//
// immutable, can be read anywhere without a lock.
defaultReservationSize int64
automatedGCEnabled bool
maxTransientDirSize int64
transientsGCWatermarkHigh float64
transientsGCWatermarkLow float64
}

var _ Interface = (*DAGStore)(nil)
Expand All @@ -123,8 +150,25 @@ type dispatch struct {
res *ShardResult
}

type reservationReq struct {
nPrevReservations int64
want int64
response chan *reservationResp
}

type reservationResp struct {
reserved int64
err error
}

type releaseReq struct {
release int64
}

// Task represents an operation to be performed on a shard or the DAG store.
type task struct {
reservationReq *reservationReq
releaseReq *releaseReq
*waiter
op OpType
shard *Shard
Expand All @@ -138,6 +182,35 @@ type ShardResult struct {
Accessor *ShardAccessor
}

type AutomatedGCConfig struct {
// DefaultReservationSize configures the default amount of space to reserve for a transient
// when downloading it if the size of the transient is not known upfront.
DefaultReservationSize int64

// GarbeCollectionStrategy specifies the garbage collection strategy we will use
// for the automated watermark based GC feature. See the documentation of `gc.GarbageCollectionStrategy` for more details.
GarbeCollectionStrategy gc.GarbageCollectionStrategy

// MaxTransientDirSize specifies the maximum allowable size of the transients directory.
MaxTransientDirSize int64

// TransientsGCWatermarkHigh is the proportion of the `MaxTransientDirSize` at which we we proactively starts GCing
// the transients directory till the ratio of (transient directory size / `MaxTransientDirSize`) is equal to or less
// than the `TransientsGCWatermarkLow` config param below.
TransientsGCWatermarkHigh float64

// TransientsGCWatermarkLow: See documentation of `TransientsGCWatermarkHigh` above.
TransientsGCWatermarkLow float64

// AutomatedGCTraceCh is a channel where the caller desires to be notified of every
// Automated GC reclaim operation. Publishing to this channel blocks the event loop, so the
// caller must ensure the channel is serviced appropriately.
//
// Note: Not actively consuming from this channel will make the event
// loop block.
AutomatedGCTraceCh chan AutomatedGCResult
}

type Config struct {
// TransientsDir is the path to directory where local transient files will
// be created for remote mounts.
Expand Down Expand Up @@ -183,6 +256,12 @@ type Config struct {
// RecoverOnStart specifies whether failed shards should be recovered
// on start.
RecoverOnStart RecoverOnStartPolicy

// AutomatedGCEnabled enables Automated GC according to the given GC policy.
AutomatedGCEnabled bool

// AutomatedGCConfig specifies the confguration parameters to use for the Automated GC.
AutomatedGCConfig *AutomatedGCConfig
}

// NewDAGStore constructs a new DAG store with the supplied configuration.
Expand All @@ -203,6 +282,12 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
cfg.IndexRepo = index.NewMemoryRepo()
}

if cfg.AutomatedGCEnabled {
if err := validateAutomatedGCConfig(cfg.AutomatedGCConfig); err != nil {
return nil, err
}
}

if cfg.TopLevelIndex == nil {
log.Info("using in-memory inverted index")
cfg.TopLevelIndex = index.NewInverted(dssync.MutexWrap(ds.NewMapDatastore()))
Expand Down Expand Up @@ -240,6 +325,8 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
throttleReaadyFetch: throttle.Noop(),
ctx: ctx,
cancelFn: cancel,
automatedGCEnabled: cfg.AutomatedGCEnabled,
gcs: &gc.NoOpStrategy{},
}

if max := cfg.MaxConcurrentIndex; max > 0 {
Expand All @@ -250,9 +337,57 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
dagst.throttleReaadyFetch = throttle.Fixed(max)
}

dagst.defaultReservationSize = int64(defaultReservation)
if dagst.automatedGCEnabled {
if cfg.AutomatedGCConfig.DefaultReservationSize != 0 {
dagst.defaultReservationSize = cfg.AutomatedGCConfig.DefaultReservationSize
}

dagst.gcs = cfg.AutomatedGCConfig.GarbeCollectionStrategy
dagst.maxTransientDirSize = cfg.AutomatedGCConfig.MaxTransientDirSize
dagst.transientsGCWatermarkHigh = cfg.AutomatedGCConfig.TransientsGCWatermarkHigh
dagst.transientsGCWatermarkLow = cfg.AutomatedGCConfig.TransientsGCWatermarkLow
dagst.automatedgcTraceCh = cfg.AutomatedGCConfig.AutomatedGCTraceCh

// default reservation size should not exceed the maximum transient directory su
if dagst.maxTransientDirSize <= dagst.defaultReservationSize {
dagst.defaultReservationSize = dagst.maxTransientDirSize / 10
}
}

var err error
dagst.totalTransientDirSize, err = dagst.transientDirSize()
if err != nil {
return nil, fmt.Errorf("failed to calculate transient dir size: %w", err)
}

return dagst, nil
}

func validateAutomatedGCConfig(cfg *AutomatedGCConfig) error {
if cfg == nil {
return errors.New("automated GC config cannot be empty since automated GC has been enabled")
}

if cfg.GarbeCollectionStrategy == nil {
return errors.New("garbage collection strategy should not be nil when automated GC is enabled")
}

if cfg.TransientsGCWatermarkLow == 0 || cfg.TransientsGCWatermarkHigh == 0 {
return errors.New("high or low watermark cannot be zero")
}

if cfg.TransientsGCWatermarkHigh <= cfg.TransientsGCWatermarkLow {
return errors.New("high water mark should be greater than low watermark")
}

if cfg.MaxTransientDirSize == 0 {
return errors.New("max transient directory size should not be zero")
}

return nil
}

// Start starts a DAG store.
func (d *DAGStore) Start(ctx context.Context) error {
if err := d.restoreState(); err != nil {
Expand Down Expand Up @@ -302,8 +437,13 @@ func (d *DAGStore) Start(ctx context.Context) error {
toRegister = append(toRegister, s)
}
}
// all shards are reclaimable in the beginning
d.gcs.NotifyReclaimable(s.key)
}

// do an automated GC if needed
d.automatedGCIfNeeded()

// spawn the control goroutine.
d.wg.Add(1)
go d.control()
Expand Down Expand Up @@ -365,6 +505,10 @@ type RegisterOpts struct {
// has acknowledged the inclusion of the shard, without waiting for any
// indexing to happen.
LazyInitialization bool

// ReservationOpts are used to configure the resevation mechanism when downloading
// transients whose size is not known upfront when automated GC is enabled.
ReservationOpts []mount.ReservationGatedDownloaderOpt
}

// RegisterShard initiates the registration of a new shard.
Expand All @@ -380,7 +524,8 @@ func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.M
}

// wrap the original mount in an upgrader.
upgraded, err := mount.Upgrade(mnt, d.throttleReaadyFetch, d.config.TransientsDir, key.String(), opts.ExistingTransient)
downloader := d.downloader(key, 0, opts.ReservationOpts...)
upgraded, err := mount.Upgrade(mnt, d.throttleReaadyFetch, d.config.TransientsDir, key.String(), opts.ExistingTransient, downloader)
if err != nil {
d.lk.Unlock()
return err
Expand Down Expand Up @@ -475,15 +620,17 @@ func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan Sha
}

type Trace struct {
Key shard.Key
Op OpType
After ShardInfo
Key shard.Key
Op OpType
After ShardInfo
TransientDirSizeCounter int64
}

type ShardInfo struct {
ShardState
Error error
refs uint32
Error error
refs uint32
TransientSize int64
}

// GetShardInfo returns the current state of shard with key k.
Expand All @@ -498,7 +645,7 @@ func (d *DAGStore) GetShardInfo(k shard.Key) (ShardInfo, error) {
}

s.lk.RLock()
info := ShardInfo{ShardState: s.state, Error: s.err, refs: s.refs}
info := ShardInfo{ShardState: s.state, Error: s.err, refs: s.refs, TransientSize: s.transientSize}
s.lk.RUnlock()
return info, nil
}
Expand All @@ -514,7 +661,7 @@ func (d *DAGStore) AllShardsInfo() AllShardsInfo {
ret := make(AllShardsInfo, len(d.shards))
for k, s := range d.shards {
s.lk.RLock()
info := ShardInfo{ShardState: s.state, Error: s.err, refs: s.refs}
info := ShardInfo{ShardState: s.state, Error: s.err, refs: s.refs, TransientSize: s.transientSize}
s.lk.RUnlock()
ret[k] = info
}
Expand Down Expand Up @@ -601,3 +748,25 @@ func (d *DAGStore) failShard(s *Shard, ch chan *task, format string, args ...int
err := fmt.Errorf(format, args...)
return d.queueTask(&task{op: OpShardFail, shard: s, err: err}, ch)
}

func (d *DAGStore) transientDirSize() (int64, error) {
var size int64
err := filepath.Walk(d.config.TransientsDir, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}

func (d *DAGStore) downloader(key shard.Key, knownTransientSize int64, opts ...mount.ReservationGatedDownloaderOpt) mount.TransientDownloader {
var downloader mount.TransientDownloader = &mount.SimpleDownloader{}
if d.automatedGCEnabled {
downloader = mount.NewReservationGatedDownloader(key, knownTransientSize, &transientAllocator{d}, opts...)
}
return downloader
}
Loading