diff --git a/das/daser_test.go b/das/daser_test.go index 14a49bba35..700255ee49 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -6,7 +6,12 @@ import ( "testing" "time" + offlineexchange "github.com/ipfs/go-ipfs-exchange-offline" + + "github.com/celestiaorg/celestia-node/edsstore" + "github.com/ipfs/go-blockservice" + bsrv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" mdutils "github.com/ipfs/go-merkledag/test" @@ -259,7 +264,8 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) { t.Cleanup(cancel) ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - bServ := mdutils.Bserv() + bstore, _ := edsstore.NewEDSStore(context.Background(), "/tmp/car", ds) + bServ := bsrv.New(bstore, offlineexchange.Exchange(bstore)) // create mock network net, err := mocknet.FullMeshLinked(1) require.NoError(t, err) @@ -267,7 +273,7 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) { ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0], pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) require.NoError(t, err) - avail := share.TestFullAvailability(bServ) + avail := share.TestFullAvailability(bServ, bstore) // 15 headers from the past and 15 future headers mockGet, shareServ, sub, _ := createDASerSubcomponents(t, bServ, 15, 15, avail) diff --git a/edsstore/edsstore.go b/edsstore/edsstore.go new file mode 100644 index 0000000000..066926a205 --- /dev/null +++ b/edsstore/edsstore.go @@ -0,0 +1,249 @@ +package edsstore + +import ( + "context" + "errors" + "os" + "sync" + + carBlockstore "github.com/ipld/go-car/v2/blockstore" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/index" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/dagstore/shard" + lru "github.com/hashicorp/golang-lru" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" +) + +var _ blockstore.Blockstore = (*EDSStore)(nil) + +var ( + // TODO(distractedm1nd): Can probably merge this with share log + edsStoreLog = logging.Logger("edsstore") + // TODO(distractedm1nd): This should also be a config value + maxCacheSize = 100 + ErrUnsupportedOperation = errors.New("unsupported operation") + ErrMultipleShardsFound = errors.New("found more than one shard with the provided cid") +) + +type DAGStore = dagstore.DAGStore + +type accessorWithBlockstore struct { + sa *dagstore.ShardAccessor + bs dagstore.ReadBlockstore +} + +// EDSStore implements the blockstore interface on a DAGStore. +// The lru cache approach is heavily inspired by the open PR filecoin-project/dagstore/116. +// The main differences to the implementation here are that we do not support multiple shards per key, +// call GetSize directly on the underlying RO blockstore, and do not throw errors on Put/PutMany. +type EDSStore struct { + DAGStore + // ctx tracks the running context of the underlying DAGStore + ctx context.Context + basePath string + + bsStripedLocks [256]sync.Mutex + // caches the blockstore for a given shard for shard read affinity i.e. + // further reads will likely be from the same shard. Maps (shard key -> blockstore). + blockstoreCache *lru.Cache +} + +func NewEDSStore(ctx context.Context, basePath string, ds datastore.Batching) (*EDSStore, error) { + // instantiate the blockstore cache + bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) { + // ensure we close the blockstore for a shard when it's evicted from the cache so dagstore can gc it. + abs := val.(*accessorWithBlockstore) + abs.sa.Close() + }) + if err != nil { + panic("could not create lru cache for read only blockstores") + } + + // create mount registry (what types of mounts are supported) + r := mount.NewRegistry() + err = r.Register("fs", &mount.FSMount{FS: os.DirFS(basePath + "/blocks/")}) + + if err != nil { + panic(err) + } + + fsRepo, err := index.NewFSRepo(basePath + "/index/") + if err != nil { + panic(err) + } + dagStore, err := dagstore.NewDAGStore( + dagstore.Config{ + TransientsDir: basePath + "/transients/", + IndexRepo: fsRepo, + Datastore: ds, + MountRegistry: r, + TopLevelIndex: index.NewInverted(ds), + }, + ) + if err != nil { + return nil, err + } + err = dagStore.Start(ctx) + if err != nil { + return nil, err + } + //err = logging.SetLogLevel("edsstore", "debug") + //if err != nil { + // panic(err) + //} + bs := &EDSStore{ + DAGStore: *dagStore, + ctx: ctx, + basePath: basePath, + blockstoreCache: bslru, + } + return bs, nil +} + +func (edsStore *EDSStore) GetCARBlockstore(key string, roots []cid.Cid) (*carBlockstore.ReadWrite, error) { + return carBlockstore.OpenReadWrite(edsStore.basePath+"/blocks/"+key, roots, carBlockstore.AllowDuplicatePuts(true)) +} + +// TODO: key shouldnt be string, but is temporarily while I figure some things out +func (edsStore *EDSStore) FinalizeCAR(car *carBlockstore.ReadWrite, key string) error { + err := car.Finalize() + if err != nil { + edsStoreLog.Errorw("couldn't finalize", "key", key, "err", err) + return err + } + err = edsStore.RegisterShard(context.Background(), shard.KeyFromString(key), &mount.FSMount{ + FS: os.DirFS(edsStore.basePath + "/blocks/"), + Path: key, + }, nil, dagstore.RegisterOpts{}) + if err != nil { + edsStoreLog.Warnw("couldn't register shard", "key", key, "err", err) + } + return nil +} + +func (edsStore *EDSStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + keys, err := edsStore.ShardsContainingMultihash(ctx, c.Hash()) + if err != nil { + return false, err + } + return len(keys) > 0, nil +} + +func (edsStore *EDSStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + bs, err := edsStore.getReadOnlyBlockstore(ctx, c) + if err != nil { + return nil, ipld.ErrNotFound{Cid: c} + } + // TODO: if bs.Get returns an error and it is from the cache, we should remove it from the cache + return bs.Get(ctx, c) +} + +func (edsStore *EDSStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + bs, err := edsStore.getReadOnlyBlockstore(ctx, c) + if err != nil { + return 0, err + } + return bs.GetSize(ctx, c) +} + +// Put needs to not return an error because it is called by the exchange +func (edsStore *EDSStore) Put(ctx context.Context, block blocks.Block) error { + return nil +} + +// PutMany needs to not return an error because it is called by the exchange +func (edsStore *EDSStore) PutMany(ctx context.Context, blocks []blocks.Block) error { + return nil +} + +func (edsStore *EDSStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, ErrUnsupportedOperation +} + +func (edsStore *EDSStore) HashOnRead(enabled bool) { + panic(ErrUnsupportedOperation) +} + +func (edsStore *EDSStore) DeleteBlock(context.Context, cid.Cid) error { + return ErrUnsupportedOperation +} + +func (edsStore *EDSStore) getReadOnlyBlockstore(ctx context.Context, c cid.Cid) (dagstore.ReadBlockstore, error) { + keys, err := edsStore.ShardsContainingMultihash(ctx, c.Hash()) + if err != nil { + return nil, err + } + if len(keys) > 1 { + return nil, ErrMultipleShardsFound + } + + // try to fetch from cache + shardKey := keys[0] + bs, err := edsStore.readFromBSCache(shardKey) + if err == nil && bs != nil { + return bs, nil + } + + // wasn't found in cache, so acquire it and add to cache + ch := make(chan dagstore.ShardResult, 1) + err = edsStore.AcquireShard(ctx, shardKey, ch, dagstore.AcquireOpts{}) + if err != nil { + return nil, err + } + result := <-ch + + blockStore, err := edsStore.addToBSCache(shardKey, result.Accessor) + if err != nil { + return nil, err + } + + return blockStore, err +} + +func (edsStore *EDSStore) readFromBSCache(shardContainingCid shard.Key) (dagstore.ReadBlockstore, error) { + lk := &edsStore.bsStripedLocks[shardKeyToStriped(shardContainingCid)] + lk.Lock() + defer lk.Unlock() + + // We've already ensured that the given shard has the cid/multihash we are looking for. + val, ok := edsStore.blockstoreCache.Get(shardContainingCid) + if !ok { + return nil, errors.New("not found in cache") + } + + rbs := val.(*accessorWithBlockstore).bs + edsStoreLog.Debugw("read blockstore from cache", "key", shardContainingCid) + return rbs, nil +} + +func (edsStore *EDSStore) addToBSCache( + shardContainingCid shard.Key, + accessor *dagstore.ShardAccessor, +) (dagstore.ReadBlockstore, error) { + lk := &edsStore.bsStripedLocks[shardKeyToStriped(shardContainingCid)] + lk.Lock() + defer lk.Unlock() + + blockStore, err := accessor.Blockstore() + if err != nil { + return nil, err + } + + edsStore.blockstoreCache.Add(shardContainingCid, &accessorWithBlockstore{ + bs: blockStore, + sa: accessor, + }) + edsStoreLog.Debugw("added blockstore to cache", "key", shardContainingCid) + return blockStore, nil +} + +func shardKeyToStriped(sk shard.Key) byte { + return sk.String()[len(sk.String())-1] +} diff --git a/edsstore/nodeadder.go b/edsstore/nodeadder.go new file mode 100644 index 0000000000..6abda4a3c5 --- /dev/null +++ b/edsstore/nodeadder.go @@ -0,0 +1,89 @@ +package edsstore + +import ( + "context" + "fmt" + + blocks "github.com/ipfs/go-block-format" + blockstore "github.com/ipfs/go-ipfs-blockstore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" + format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-verifcid" +) + +// CARNodeAdder is needed to fulfill the NodeAdder interface +type CARNodeAdder struct { + exchange exchange.Interface + blockstore blockstore.Blockstore +} + +func New(bs blockstore.Blockstore, ex exchange.Interface) *CARNodeAdder { + return &CARNodeAdder{ + exchange: ex, + blockstore: bs, + } +} + +func (na *CARNodeAdder) Add(ctx context.Context, nd format.Node) error { + if na == nil { // FIXME remove this assertion. protect with constructor invariant + return fmt.Errorf("dagService is nil") + } + return na.AddBlock(ctx, nd) +} + +func (na *CARNodeAdder) AddMany(ctx context.Context, nds []format.Node) error { + blks := make([]blocks.Block, len(nds)) + for i, nd := range nds { + blks[i] = nd + } + return na.AddBlocks(ctx, blks) +} + +// AddBlock adds a particular block to the service, Putting it into the datastore. +func (na *CARNodeAdder) AddBlock(ctx context.Context, o blocks.Block) error { + c := o.Cid() + // hash security + err := verifcid.ValidateCid(c) + if err != nil { + return err + } + + if err := na.blockstore.Put(ctx, o); err != nil { + return err + } + + if err := na.exchange.HasBlock(ctx, o); err != nil { + panic("Couldnt add block") + } + + return nil +} + +func (na *CARNodeAdder) AddBlocks(ctx context.Context, blks []blocks.Block) error { + // hash security + for _, b := range blks { + err := verifcid.ValidateCid(b.Cid()) + if err != nil { + return err + } + } + + toput := blks + + if len(toput) == 0 { + return nil + } + + err := na.blockstore.PutMany(ctx, toput) + if err != nil { + edsStoreLog.Warn("failed to put blocks in CAR: ", err) + return err + } + + for _, o := range toput { + if err := na.exchange.HasBlock(ctx, o); err != nil { + panic("Couldnt add block") + } + } + return nil +} diff --git a/fraud/testing.go b/fraud/testing.go index 386f2bceff..1b96e8edb3 100644 --- a/fraud/testing.go +++ b/fraud/testing.go @@ -62,7 +62,7 @@ func generateByzantineError( bServ blockservice.BlockService, ) (*header.ExtendedHeader, error) { faultHeader := header.CreateFraudExtHeader(t, h, bServ) - rtrv := ipld.NewRetriever(bServ) + rtrv := ipld.NewBasicRetriever(bServ) _, err := rtrv.Retrieve(ctx, faultHeader.DAH) return faultHeader, err } diff --git a/go.mod b/go.mod index 014a9a2069..242fa251b7 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/cosmos/cosmos-sdk/api v0.1.0 github.com/cosmos/ibc-go/v4 v4.0.0-rc0 github.com/dgraph-io/badger/v2 v2.2007.4 + github.com/filecoin-project/dagstore v0.5.3 github.com/gammazero/workerpool v1.1.2 github.com/gogo/protobuf v1.3.3 github.com/gorilla/mux v1.8.0 @@ -33,6 +34,8 @@ require ( github.com/ipfs/go-ipld-format v0.4.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-merkledag v0.6.0 + github.com/ipfs/go-verifcid v0.0.1 + github.com/ipld/go-car/v2 v2.4.1 github.com/libp2p/go-libp2p v0.20.3 github.com/libp2p/go-libp2p-core v0.17.0 github.com/libp2p/go-libp2p-kad-dht v0.16.0 @@ -163,8 +166,7 @@ require ( github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.7.0 // indirect - github.com/ipfs/go-verifcid v0.0.1 // indirect - github.com/ipld/go-codec-dagpb v1.3.0 // indirect + github.com/ipld/go-codec-dagpb v1.3.1 // indirect github.com/ipld/go-ipld-prime v0.16.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect @@ -211,7 +213,7 @@ require ( github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.0.3 // indirect - github.com/multiformats/go-multicodec v0.4.1 // indirect + github.com/multiformats/go-multicodec v0.5.0 // indirect github.com/multiformats/go-multistream v0.3.3 // indirect github.com/multiformats/go-varint v0.0.6 // indirect github.com/nxadm/tail v1.4.8 // indirect @@ -221,6 +223,7 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.2 // indirect + github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -247,6 +250,7 @@ require ( github.com/tendermint/tm-db v0.6.7 // indirect github.com/ulikunitz/xz v0.5.8 // indirect github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3 // indirect + github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect @@ -262,6 +266,7 @@ require ( go.uber.org/dig v1.14.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect + golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect diff --git a/go.sum b/go.sum index 468f93c852..bc0bac2cc1 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,7 @@ cosmossdk.io/math v1.0.0-beta.2 h1:17hSVc9ne1c31IaLDfjRojtN+y4Rd2N8H/6Fht2sBzw= cosmossdk.io/math v1.0.0-beta.2/go.mod h1:u/MXvf8wbUbCsAEyQSSYXXMsczAsFX48e2D6JI86T4o= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU= dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4= dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= @@ -95,6 +96,7 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig= @@ -235,6 +237,7 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534 h1:rtAn27wIbmOGUs7RIbVgPEjb31ehTVniDwPGXyMxm5U= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -332,6 +335,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/filecoin-project/dagstore v0.5.3 h1:++s4pEW/NvHph0N8sCdz7NokU0Y3r2yVB5SFaDTLLWM= +github.com/filecoin-project/dagstore v0.5.3/go.mod h1:mdqKzYrRBHf1pRMthYfMv3n37oOw0Tkx7+TxPt240M0= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= @@ -341,6 +346,7 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -372,6 +378,7 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -603,12 +610,15 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= +github.com/ipfs/go-bitfield v1.0.0 h1:y/XHm2GEmD9wKngheWNNCNL0pzrWXZwCdQGv1ikXknQ= +github.com/ipfs/go-bitswap v0.5.1/go.mod h1:P+ckC87ri1xFLvk74NlXdP0Kj9RmWAh4+H78sC6Qopo= github.com/ipfs/go-bitswap v0.6.0/go.mod h1:Hj3ZXdOC5wBJvENtdqsixmzzRukqd8EHLxZLZc3mzRA= github.com/ipfs/go-bitswap v0.7.0 h1:vSte4lll4Rob7cMQERUouxtFbuD7Vl4Hq+XEAp2ipKY= github.com/ipfs/go-bitswap v0.7.0/go.mod h1:Hj3ZXdOC5wBJvENtdqsixmzzRukqd8EHLxZLZc3mzRA= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.0.3 h1:r8t66QstRp/pd/or4dpnbVfXT5Gt7lOqRvC+/dDTpMc= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= +github.com/ipfs/go-blockservice v0.2.1/go.mod h1:k6SiwmgyYgs4M/qt+ww6amPeUH9EISLRBnvUurKJhi8= github.com/ipfs/go-blockservice v0.3.0 h1:cDgcZ+0P0Ih3sl8+qjFr2sVaMdysg/YZpLj5WJ8kiiw= github.com/ipfs/go-blockservice v0.3.0/go.mod h1:P5ppi8IHDC7O+pA0AlGTF09jruB2h+oP3wVVaZl8sfk= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -647,11 +657,15 @@ github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIyk github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= +github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= +github.com/ipfs/go-ipfs-blockstore v0.2.1/go.mod h1:jGesd8EtCM3/zPgx+qr0/feTXGUeRai6adgwC+Q+JvE= +github.com/ipfs/go-ipfs-blockstore v1.1.2/go.mod h1:w51tNR9y5+QXB0wkNcHt4O2aSZjTdqaEWaQdSxEyUOY= github.com/ipfs/go-ipfs-blockstore v1.2.0 h1:n3WTeJ4LdICWs/0VSfjHrlqpPpl6MZ+ySd3j8qz0ykw= github.com/ipfs/go-ipfs-blockstore v1.2.0/go.mod h1:eh8eTFLiINYNSNawfZOC7HOxNTxpB1PFuA5E1m/7exE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= +github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE27SEw= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= @@ -660,6 +674,7 @@ github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR9 github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= github.com/ipfs/go-ipfs-exchange-interface v0.1.0 h1:TiMekCrOGQuWYtZO3mf4YJXDIdNgnKWZ9IE3fGlnWfo= github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI= +github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT0mO7jG2cbJYatp3HPk5XY= github.com/ipfs/go-ipfs-exchange-offline v0.2.0 h1:2PF4o4A7W656rC0RxuhUace997FTcDTcIQ6NoEtyjAI= github.com/ipfs/go-ipfs-exchange-offline v0.2.0/go.mod h1:HjwBeW0dvZvfOMwDP0TSKXIHf2s+ksdP4E3MLDRtLKY= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= @@ -695,19 +710,28 @@ github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72g github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= +github.com/ipfs/go-merkledag v0.5.1/go.mod h1:cLMZXx8J08idkp5+id62iVftUQV+HlYJ3PIhDfZsjA4= github.com/ipfs/go-merkledag v0.6.0 h1:oV5WT2321tS4YQVOPgIrWHvJ0lJobRTerU+i9nmUCuA= github.com/ipfs/go-merkledag v0.6.0/go.mod h1:9HSEwRd5sV+lbykiYP+2NC/3o6MZbKNaa4hfNcH5iH0= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0= github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= -github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8= +github.com/ipfs/go-unixfsnode v1.4.0 h1:9BUxHBXrbNi8mWHc6j+5C580WJqtVw9uoeEKn4tMhwA= +github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI= +github.com/ipld/go-car/v2 v2.4.1 h1:9S+FYbQzQJ/XzsdiOV13W5Iu/i+gUnr6csbSD9laFEg= +github.com/ipld/go-car/v2 v2.4.1/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0= github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA= +github.com/ipld/go-codec-dagpb v1.3.1 h1:yVNlWRQexCa54ln3MSIiUN++ItH7pdhBFhh0hSgZu1w= +github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY= github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8= +github.com/ipld/go-ipld-prime v0.14.0/go.mod h1:9ASQLwUFLptCov6lIYc70GRB4V7UTyLD0IJtrDJe6ZM= github.com/ipld/go-ipld-prime v0.16.0 h1:RS5hhjB/mcpeEPJvfyj0qbOj/QL+/j05heZ0qa97dVo= github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA= +github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ= +github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -761,6 +785,7 @@ github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY= github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= @@ -1203,8 +1228,10 @@ github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77 github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= github.com/multiformats/go-multicodec v0.2.0/go.mod h1:/y4YVwkfMyry5kFbMTbLJKErhycTIftytRV+llXdyS4= github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ= -github.com/multiformats/go-multicodec v0.4.1 h1:BSJbf+zpghcZMZrwTYBGwy0CPcVZGWiC72Cp8bBd4R4= +github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ= github.com/multiformats/go-multicodec v0.4.1/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ= +github.com/multiformats/go-multicodec v0.5.0 h1:EgU6cBe/D7WRwQb1KmnBvU7lrcFGMggZVTPtOW9dDHs= +github.com/multiformats/go-multicodec v0.5.0/go.mod h1:DiY2HFaEp5EhEXb/iYzVAunmyX/aSFMxq2KMKfWEues= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= @@ -1294,6 +1321,8 @@ github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko github.com/pelletier/go-toml/v2 v2.0.2 h1:+jQXlF3scKIcSEKkdHzXhCTDLPFi5r1wnK6yPS+49Gw= github.com/pelletier/go-toml/v2 v2.0.2/go.mod h1:MovirKjgVRESsAvNZlAjtFwV867yGuwRkXbG66OzopI= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= +github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= +github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= @@ -1374,6 +1403,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM= github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -1412,6 +1443,7 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU= @@ -1500,8 +1532,11 @@ github.com/warpfork/go-testmark v0.3.0/go.mod h1:jhEf8FVxd+F17juRubpmut64NEG6I2r github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= +github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 h1:5HZfQkwe0mIfyDmc1Em5GqlNRzcdtlv4HTNmdpt7XH0= +github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11/go.mod h1:Wlo/SzPmxVp6vXpGt/zaXhHH0fn4IxgqZc82aKg6bpQ= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 h1:WXhVOwj2USAXB5oMDwRl3piOux2XMV9TANaYxXHdkoE= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= +github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= @@ -1543,6 +1578,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg= go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM= @@ -1556,12 +1592,16 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0 h1:LrHL1A3KqIgAgi6mK7Q0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0/go.mod h1:w8aZL87GMOvOBa2lU/JlVXE1q4chk/0FX+8ai4513bw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.8.0 h1:SMO1HopgdAqNRit+WA3w3dcJSGANuH/ihKXDekEHfuY= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.8.0/go.mod h1:tsw+QO2+pGo7xOrPXrS27HxW8uqGQkw5AzJwdsoyvgw= +go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs= go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A= +go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= +go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc= go.opentelemetry.io/otel/sdk v1.8.0 h1:xwu69/fNuwbSHWe/0PGS888RmjWY181OmcXDQKu7ZQk= go.opentelemetry.io/otel/sdk v1.8.0/go.mod h1:uPSfc+yfDH2StDM/Rm35WE8gXSNdvCg023J6HeGNO0c= go.opentelemetry.io/otel/sdk/metric v0.31.0 h1:2sZx4R43ZMhJdteKAlKoHvRgrMp53V1aRxvEf5lCq8Q= go.opentelemetry.io/otel/sdk/metric v0.31.0/go.mod h1:fl0SmNnX9mN9xgU6OLYLMBMrNAsaZQi7qBwprwO3abk= +go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY= go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4= @@ -1633,6 +1673,8 @@ golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -1642,6 +1684,7 @@ golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= +golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= @@ -1649,6 +1692,9 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20210615023648-acb5c1269671/go.mod h1:DVyR6MI7P4kEQgvZJSj1fQGrWIi2RzIrfYWycwheUAc= +golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8 h1:9W9g+Ds9GhvhwTxEohXKdFdWqPq+ZQGVpLMPs1UzQQE= +golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8/go.mod h1:DVyR6MI7P4kEQgvZJSj1fQGrWIi2RzIrfYWycwheUAc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1666,10 +1712,12 @@ golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/mobile v0.0.0-20201217150744-e6ae53a27f4f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -1943,6 +1991,7 @@ golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200122220014-bf1340f18c4a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= @@ -2227,6 +2276,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0= lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= diff --git a/header/header.go b/header/header.go index c0240caf29..c8847b0197 100644 --- a/header/header.go +++ b/header/header.go @@ -5,14 +5,14 @@ import ( "context" "fmt" + "github.com/celestiaorg/celestia-node/ipld" + "github.com/ipfs/go-blockservice" logging "github.com/ipfs/go-log/v2" bts "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/pkg/da" core "github.com/tendermint/tendermint/types" - - "github.com/celestiaorg/celestia-node/ipld" ) var log = logging.Logger("header") diff --git a/ipld/add.go b/ipld/add.go index 6b768cb3a0..b971aefa93 100644 --- a/ipld/add.go +++ b/ipld/add.go @@ -26,7 +26,7 @@ func AddShares( squareSize := int(math.Sqrt(float64(len(shares)))) // create nmt adder wrapping batch adder with calculated size bs := batchSize(squareSize * 2) - batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs)) + batchAdder := NewBasicNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs)) // create the nmt wrapper to generate row and col commitments tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(batchAdder.Visit)) // recompute the eds @@ -51,7 +51,7 @@ func ImportShares( squareSize := int(math.Sqrt(float64(len(shares)))) // create nmt adder wrapping batch adder with calculated size bs := batchSize(squareSize * 2) - batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs)) + batchAdder := NewBasicNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs)) // create the nmt wrapper to generate row and col commitments tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize/2), nmt.NodeVisitor(batchAdder.Visit)) // recompute the eds diff --git a/ipld/nmt_adder.go b/ipld/nmt_adder.go index 4b49eedfce..4fd98b7866 100644 --- a/ipld/nmt_adder.go +++ b/ipld/nmt_adder.go @@ -4,11 +4,12 @@ import ( "context" "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-merkledag" - "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/ipld/go-car/v2/blockstore" + "github.com/celestiaorg/celestia-node/edsstore" "github.com/celestiaorg/celestia-node/ipld/plugin" ) @@ -16,6 +17,10 @@ import ( // into a nmt tree. type NmtNodeAdder struct { ctx context.Context + bs *edsstore.EDSStore + key string + roots []cid.Cid + rw *blockstore.ReadWrite add *ipld.Batch leaves *cid.Set err error @@ -24,7 +29,23 @@ type NmtNodeAdder struct { // NewNmtNodeAdder returns a new NmtNodeAdder with the provided context and // batch. Note that the context provided should have a timeout // It is not thread-safe. -func NewNmtNodeAdder(ctx context.Context, bs blockservice.BlockService, opts ...ipld.BatchOption) *NmtNodeAdder { +func NewNmtNodeAdder(ctx context.Context, roots []cid.Cid, key string, bs blockservice.BlockService, edsStr *edsstore.EDSStore, opts ...ipld.BatchOption) (*NmtNodeAdder, error) { + carBlockStore, err := edsStr.GetCARBlockstore(key, roots) + if err != nil { + return nil, err + } + return &NmtNodeAdder{ + add: ipld.NewBatch(ctx, edsstore.New(carBlockStore, bs.Exchange()), opts...), + bs: edsStr, + key: key, + roots: roots, + rw: carBlockStore, + ctx: ctx, + leaves: cid.NewSet(), + }, nil +} + +func NewBasicNmtNodeAdder(ctx context.Context, bs blockservice.BlockService, opts ...ipld.BatchOption) *NmtNodeAdder { return &NmtNodeAdder{ add: ipld.NewBatch(ctx, merkledag.NewDAGService(bs), opts...), ctx: ctx, @@ -58,5 +79,14 @@ func (n *NmtNodeAdder) Commit() error { return n.err } - return n.add.Commit() + err := n.add.Commit() + if err != nil { + log.Errorw("error committing batch", "key", n.key, "err", err) + } + + if n.bs != nil { + return n.bs.FinalizeCAR(n.rw, n.key) + } else { + return err + } } diff --git a/ipld/retriever.go b/ipld/retriever.go index 65802e0238..45dc45861f 100644 --- a/ipld/retriever.go +++ b/ipld/retriever.go @@ -4,10 +4,13 @@ import ( "context" "encoding/hex" "errors" + "fmt" "sync" "sync/atomic" "time" + "github.com/celestiaorg/celestia-node/edsstore" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -40,14 +43,20 @@ var tracer = otel.Tracer("ipld") // Retriever randomly picks one of the data square quadrants and tries to request them one by one // until it is able to reconstruct the whole square. type Retriever struct { - bServ blockservice.BlockService + bServ blockservice.BlockService + edsStr *edsstore.EDSStore } -// NewRetriever creates a new instance of the Retriever over IPLD Service and rmst2d.Codec -func NewRetriever(bServ blockservice.BlockService) *Retriever { +// NewBasicRetriever creates a new instance of the Retriever over IPLD Service and rmst2d.Codec +func NewBasicRetriever(bServ blockservice.BlockService) *Retriever { return &Retriever{bServ: bServ} } +// NewRetriever creates a new instance of the Retriever over IPLD Service and rmst2d.Codec +func NewRetriever(bServ blockservice.BlockService, edsStr *edsstore.EDSStore) *Retriever { + return &Retriever{bServ: bServ, edsStr: edsStr} +} + // Retrieve retrieves all the data committed to DataAvailabilityHeader. // // If not available locally, it aims to request from the network only one quadrant (1/4) of the data square @@ -125,11 +134,24 @@ type retrievalSession struct { // newSession creates a new retrieval session and kicks off requesting process. func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHeader) (*retrievalSession, error) { size := len(dah.RowsRoots) - adder := NewNmtNodeAdder( + roots := make([]cid.Cid, 0, size*2) + for _, row := range dah.RowsRoots { + roots = append(roots, plugin.MustCidFromNamespacedSha256(row)) + } + for _, row := range dah.ColumnRoots { + roots = append(roots, plugin.MustCidFromNamespacedSha256(row)) + } + adder, err := NewNmtNodeAdder( ctx, + roots, + dah.String(), r.bServ, + r.edsStr, format.MaxSizeBatchOption(batchSize(size)), ) + if err != nil { + return nil, fmt.Errorf("failed to create NmtNodeAdder: %w", err) + } ses := &retrievalSession{ bget: blockservice.NewSession(ctx, r.bServ), adder: adder, diff --git a/ipld/retriever_test.go b/ipld/retriever_test.go index 18e2265d6b..f255838b94 100644 --- a/ipld/retriever_test.go +++ b/ipld/retriever_test.go @@ -30,7 +30,7 @@ func TestRetriever_Retrieve(t *testing.T) { defer cancel() bServ := mdutils.Bserv() - r := NewRetriever(bServ) + r := NewBasicRetriever(bServ) type test struct { name string @@ -80,7 +80,7 @@ func TestRetriever_ByzantineError(t *testing.T) { copy(shares[14][8:], shares[15][8:]) // import corrupted eds - batchAdder := NewNmtNodeAdder(ctx, bserv, format.MaxSizeBatchOption(batchSize(width*2))) + batchAdder := NewBasicNmtNodeAdder(ctx, bserv, format.MaxSizeBatchOption(batchSize(width*2))) tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(width), nmt.NodeVisitor(batchAdder.Visit)) attackerEDS, err := rsmt2d.ImportExtendedDataSquare(shares, DefaultRSMT2DCodec(), tree.Constructor) require.NoError(t, err) @@ -89,7 +89,7 @@ func TestRetriever_ByzantineError(t *testing.T) { // ensure we rcv an error da := da.NewDataAvailabilityHeader(attackerEDS) - r := NewRetriever(bserv) + r := NewBasicRetriever(bserv) _, err = r.Retrieve(ctx, &da) var errByz *ErrByzantine require.ErrorAs(t, err, &errByz) @@ -104,7 +104,7 @@ func TestRetriever_MultipleRandQuadrants(t *testing.T) { defer cancel() bServ := mdutils.Bserv() - r := NewRetriever(bServ) + r := NewBasicRetriever(bServ) // generate EDS shares := RandShares(t, squareSize*squareSize) diff --git a/node/components.go b/node/components.go index 2150dbb72c..d01ca570d0 100644 --- a/node/components.go +++ b/node/components.go @@ -29,7 +29,9 @@ func lightComponents(cfg *Config, store Store) fx.Option { return fx.Options( fx.Supply(Light), baseComponents(cfg, store), + fx.Provide(p2p.DefaultBlockstore), fx.Provide(services.DASer), + fx.Invoke(share.EnsureEmptySquareExists), fx.Provide(services.HeaderExchangeP2P(cfg.Services)), fx.Provide(services.LightAvailability(cfg.Services)), fx.Provide(services.CacheAvailability[*share.LightAvailability]), @@ -42,8 +44,10 @@ func bridgeComponents(cfg *Config, store Store) fx.Option { return fx.Options( fx.Supply(Bridge), baseComponents(cfg, store), + fx.Provide(p2p.DefaultBlockstore), nodecore.Components(cfg.Core), fx.Supply(header.MakeExtendedHeader), + fx.Invoke(share.EnsureEmptySquareExists), fx.Provide(services.FullAvailability(cfg.Services)), fx.Provide(services.CacheAvailability[*share.FullAvailability]), fx.Invoke(func( @@ -62,6 +66,7 @@ func fullComponents(cfg *Config, store Store) fx.Option { return fx.Options( fx.Supply(Full), baseComponents(cfg, store), + fx.Provide(services.EDSStore(cfg.Services)), fx.Provide(services.DASer), fx.Provide(services.HeaderExchangeP2P(cfg.Services)), fx.Provide(services.FullAvailability(cfg.Services)), @@ -81,7 +86,6 @@ func baseComponents(cfg *Config, store Store) fx.Option { fx.Provide(store.Datastore), fx.Provide(store.Keystore), // share components - fx.Invoke(share.EnsureEmptySquareExists), fx.Provide(services.ShareService), // header components fx.Provide(services.HeaderService), diff --git a/node/p2p/bitswap.go b/node/p2p/bitswap.go index 3777f1dea1..2abd1bc478 100644 --- a/node/p2p/bitswap.go +++ b/node/p2p/bitswap.go @@ -27,34 +27,37 @@ const ( defaultARCCacheSize = 64 << 10 ) -// DataExchange provides a constructor for IPFS block's DataExchange over BitSwap. -func DataExchange(cfg Config) func(bitSwapParams) (exchange.Interface, blockstore.Blockstore, error) { - return func(params bitSwapParams) (exchange.Interface, blockstore.Blockstore, error) { - ctx := fxutil.WithLifecycle(params.Ctx, params.Lc) - bs, err := blockstore.CachedBlockstore( - ctx, - blockstore.NewBlockstore(params.Ds), - blockstore.CacheOpts{ - HasBloomFilterSize: defaultBloomFilterSize, - HasBloomFilterHashes: defaultBloomFilterHashes, - HasARCCacheSize: defaultARCCacheSize, - }, - ) - if err != nil { - return nil, nil, err - } - prefix := protocol.ID(fmt.Sprintf("/celestia/%s", params.Net)) - return bitswap.New( - ctx, - network.NewFromIpfsHost(params.Host, &routinghelpers.Null{}, network.Prefix(prefix)), - bs, - bitswap.ProvideEnabled(false), - // NOTE: These below ar required for our protocol to work reliably. - // See https://github.com/celestiaorg/celestia-node/issues/732 - bitswap.SetSendDontHaves(false), - bitswap.SetSimulateDontHavesOnTimeout(false), - ), bs, nil +func DefaultBlockstore(params bitSwapParams) (blockstore.Blockstore, error) { + ctx := fxutil.WithLifecycle(params.Ctx, params.Lc) + bs, err := blockstore.CachedBlockstore( + ctx, + blockstore.NewBlockstore(params.Ds), + blockstore.CacheOpts{ + HasBloomFilterSize: defaultBloomFilterSize, + HasBloomFilterHashes: defaultBloomFilterHashes, + HasARCCacheSize: defaultARCCacheSize, + }, + ) + if err != nil { + return nil, err } + return bs, nil +} + +// DataExchange provides a constructor for IPFS block's DataExchange over BitSwap. +func DataExchange(params bitSwapParams, bs blockstore.Blockstore) (exchange.Interface, error) { + ctx := fxutil.WithLifecycle(params.Ctx, params.Lc) + prefix := protocol.ID(fmt.Sprintf("/celestia/%s", params.Net)) + return bitswap.New( + ctx, + network.NewFromIpfsHost(params.Host, &routinghelpers.Null{}, network.Prefix(prefix)), + bs, + bitswap.ProvideEnabled(false), + // NOTE: These below ar required for our protocol to work reliably. + // See https://github.com/celestiaorg/celestia-node/issues/732 + bitswap.SetSendDontHaves(false), + bitswap.SetSimulateDontHavesOnTimeout(false), + ), nil } type bitSwapParams struct { diff --git a/node/p2p/p2p.go b/node/p2p/p2p.go index 346ec52c8c..3d9344b6b6 100644 --- a/node/p2p/p2p.go +++ b/node/p2p/p2p.go @@ -72,7 +72,7 @@ func Components(cfg Config) fx.Option { fx.Provide(Host(cfg)), fx.Provide(RoutedHost), fx.Provide(PubSub(cfg)), - fx.Provide(DataExchange(cfg)), + fx.Provide(DataExchange), fx.Provide(BlockService), fx.Provide(PeerRouting(cfg)), fx.Provide(ContentRouting), diff --git a/node/services/config.go b/node/services/config.go index 501686d58f..211c2f0448 100644 --- a/node/services/config.go +++ b/node/services/config.go @@ -15,6 +15,9 @@ import ( var log = logging.Logger("node/services") type Config struct { + // BasePath is the path to EDSStore's base directory. + // TODO: this will obviously live somewhere else + BasePath string // TrustedHash is the Block/Header hash that Nodes use as starting point for header synchronization. // Only affects the node once on initial sync. TrustedHash string @@ -34,6 +37,7 @@ type Config struct { func DefaultConfig() Config { return Config{ + BasePath: "/tmp/eds", TrustedHash: "", TrustedPeers: make([]string, 0), PeersLimit: 3, diff --git a/node/services/service.go b/node/services/service.go index 226d4842d0..354205cd8c 100644 --- a/node/services/service.go +++ b/node/services/service.go @@ -5,6 +5,7 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" @@ -13,6 +14,8 @@ import ( routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" "go.uber.org/fx" + "github.com/celestiaorg/celestia-node/edsstore" + "github.com/celestiaorg/celestia-node/das" "github.com/celestiaorg/celestia-node/fraud" "github.com/celestiaorg/celestia-node/header" @@ -25,6 +28,24 @@ import ( "github.com/celestiaorg/celestia-node/service/share" ) +// EDSStore creates and initializes a new EDSStore. +// TODO(distractedm1nd): This will live elsewhere once the refactorings are merged +func EDSStore(cfg Config) func(fx.Lifecycle, context.Context, datastore.Batching) (*edsstore.EDSStore, blockstore.Blockstore, error) { + return func(lc fx.Lifecycle, ctx context.Context, ds datastore.Batching) (*edsstore.EDSStore, blockstore.Blockstore, error) { + lifecycleCtx := fxutil.WithLifecycle(ctx, lc) + edsStore, err := edsstore.NewEDSStore(lifecycleCtx, cfg.BasePath, ds) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return edsStore.Start(lifecycleCtx) + }, + OnStop: func(ctx context.Context) error { + return edsStore.Close() + }, + }) + return edsStore, edsStore, err + } +} + // HeaderSyncer creates a new Syncer. func HeaderSyncer( ctx context.Context, @@ -203,12 +224,14 @@ func LightAvailability(cfg Config) func( func FullAvailability(cfg Config) func( lc fx.Lifecycle, bServ blockservice.BlockService, + edsStr *edsstore.EDSStore, r routing.ContentRouting, h host.Host, ) *share.FullAvailability { return func( lc fx.Lifecycle, bServ blockservice.BlockService, + edsStr *edsstore.EDSStore, r routing.ContentRouting, h host.Host, ) *share.FullAvailability { @@ -219,7 +242,7 @@ func FullAvailability(cfg Config) func( cfg.DiscoveryInterval, cfg.AdvertiseInterval, ) - fa := share.NewFullAvailability(bServ, disc) + fa := share.NewFullAvailability(bServ, edsStr, disc) lc.Append(fx.Hook{ OnStart: fa.Start, OnStop: fa.Stop, diff --git a/service/share/full_availability.go b/service/share/full_availability.go index edc1b7eeaa..830a9aa248 100644 --- a/service/share/full_availability.go +++ b/service/share/full_availability.go @@ -7,6 +7,8 @@ import ( "github.com/ipfs/go-blockservice" format "github.com/ipfs/go-ipld-format" + "github.com/celestiaorg/celestia-node/edsstore" + "github.com/celestiaorg/celestia-node/ipld" ) @@ -21,9 +23,9 @@ type FullAvailability struct { } // NewFullAvailability creates a new full Availability. -func NewFullAvailability(bServ blockservice.BlockService, disc *discovery) *FullAvailability { +func NewFullAvailability(bServ blockservice.BlockService, edsStr *edsstore.EDSStore, disc *discovery) *FullAvailability { return &FullAvailability{ - rtrv: ipld.NewRetriever(bServ), + rtrv: ipld.NewRetriever(bServ, edsStr), disc: disc, } } diff --git a/service/share/share.go b/service/share/share.go index 85124bc7e6..41cfebe952 100644 --- a/service/share/share.go +++ b/service/share/share.go @@ -60,7 +60,7 @@ type Service struct { // NewService creates new basic share.Service. func NewService(bServ blockservice.BlockService, avail Availability) *Service { return &Service{ - rtrv: ipld.NewRetriever(bServ), + rtrv: ipld.NewBasicRetriever(bServ), Availability: avail, bServ: bServ, } diff --git a/service/share/testing.go b/service/share/testing.go index 49d21a6107..ae2cd4daeb 100644 --- a/service/share/testing.go +++ b/service/share/testing.go @@ -9,9 +9,11 @@ import ( "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" + bsrv "github.com/ipfs/go-blockservice" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" blockstore "github.com/ipfs/go-ipfs-blockstore" + offlineexchange "github.com/ipfs/go-ipfs-exchange-offline" "github.com/ipfs/go-ipfs-routing/offline" mdutils "github.com/ipfs/go-merkledag/test" "github.com/libp2p/go-libp2p-core/host" @@ -23,9 +25,12 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/pkg/da" + "github.com/celestiaorg/celestia-node/edsstore" "github.com/celestiaorg/celestia-node/ipld" ) +var testingPath = "/tmp/car" + // RandLightServiceWithSquare provides a share.Service filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. func RandLightServiceWithSquare(t *testing.T, n int) (*Service, *Root) { @@ -43,8 +48,9 @@ func RandLightService() (*Service, blockservice.BlockService) { // RandFullServiceWithSquare provides a share.Service filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. func RandFullServiceWithSquare(t *testing.T, n int) (*Service, *Root) { - bServ := mdutils.Bserv() - return NewService(bServ, TestFullAvailability(bServ)), RandFillBS(t, n, bServ) + bstore, _ := edsstore.NewEDSStore(context.Background(), testingPath, dssync.MutexWrap(ds.NewMapDatastore())) + bServ := bsrv.New(bstore, offlineexchange.Exchange(bstore)) + return NewService(bServ, TestFullAvailability(bServ, bstore)), RandFillDagBS(t, n, bServ, bstore) } // RandLightLocalServiceWithSquare is the same as RandLightServiceWithSquare, except @@ -62,13 +68,20 @@ func RandLightLocalServiceWithSquare(t *testing.T, n int) (*Service, *Root) { // RandFullLocalServiceWithSquare is the same as RandFullServiceWithSquare, except // the Availability is wrapped with CacheAvailability. func RandFullLocalServiceWithSquare(t *testing.T, n int) (*Service, *Root) { - bServ := mdutils.Bserv() ds := dssync.MutexWrap(ds.NewMapDatastore()) + bstore, _ := edsstore.NewEDSStore(context.Background(), testingPath, ds) + bServ := bsrv.New(bstore, offlineexchange.Exchange(bstore)) ca := NewCacheAvailability( - TestFullAvailability(bServ), + TestFullAvailability(bServ, bstore), ds, ) - return NewService(bServ, ca), RandFillBS(t, n, bServ) + return NewService(bServ, ca), RandFillDagBS(t, n, bServ, bstore) +} + +// RandFillBS fills the given BlockService with a random block of a given size. +func RandFillDagBS(t *testing.T, n int, bServ blockservice.BlockService, edsStr *edsstore.EDSStore) *Root { + shares := RandShares(t, n*n) + return FillDagBS(t, bServ, edsStr, shares) } // RandFillBS fills the given BlockService with a random block of a given size. @@ -85,6 +98,15 @@ func FillBS(t *testing.T, bServ blockservice.BlockService, shares []Share) *Root return &dah } +// FillDagBS fills the given BlockService with the given shares. +func FillDagBS(t *testing.T, bServ blockservice.BlockService, edsStr *edsstore.EDSStore, shares []Share) *Root { + //eds, err := ipld.AddSharesToDAGStore(context.TODO(), shares, bServ, edsStr) + eds, err := ipld.AddShares(context.TODO(), shares, bServ) + require.NoError(t, err) + dah := da.NewDataAvailabilityHeader(eds) + return &dah +} + // RandShares provides 'n' randomized shares prefixed with random namespaces. func RandShares(t *testing.T, n int) []Share { return ipld.RandShares(t, n) @@ -97,6 +119,11 @@ type node struct { host.Host } +type fullNode struct { + node + edsstore.EDSStore +} + // ClearStorage cleans up the storage of the node. func (n *node) ClearStorage() { keys, err := n.Blockstore().AllKeysChan(n.net.ctx) @@ -132,9 +159,9 @@ func (dn *dagNet) RandLightNode(squareSize int) (*node, *Root) { } // RandFullNode creates a Full Node filled with a random block of the given size. -func (dn *dagNet) RandFullNode(squareSize int) (*node, *Root) { +func (dn *dagNet) RandFullNode(squareSize int) (*fullNode, *Root) { nd := dn.FullNode() - return nd, RandFillBS(dn.t, squareSize, nd.BlockService) + return nd, RandFillDagBS(dn.t, squareSize, nd.BlockService, &nd.EDSStore) } // LightNode creates a new empty LightAvailability Node. @@ -145,9 +172,9 @@ func (dn *dagNet) LightNode() *node { } // FullNode creates a new empty FullAvailability Node. -func (dn *dagNet) FullNode() *node { - nd := dn.Node() - nd.Service = NewService(nd.BlockService, TestFullAvailability(nd.BlockService)) +func (dn *dagNet) FullNode() *fullNode { + nd := dn.FNode() + nd.Service = NewService(nd.BlockService, TestFullAvailability(nd.node.BlockService, &nd.EDSStore)) return nd } @@ -178,6 +205,35 @@ func (dn *dagNet) Node() *node { return nd } +func (dn *dagNet) FNode() *fullNode { + hst, err := dn.net.GenPeer() + require.NoError(dn.t, err) + dstore := dssync.MutexWrap(ds.NewMapDatastore()) + dagbs, _ := edsstore.NewEDSStore(context.Background(), testingPath, dstore) + routing := offline.NewOfflineRouter(dstore, record.NamespacedValidator{}) + bs := bitswap.New( + dn.ctx, + network.NewFromIpfsHost(hst, routing), + dagbs, + bitswap.ProvideEnabled(false), // disable routines for DHT content provides, as we don't use them + bitswap.EngineBlockstoreWorkerCount(1), // otherwise it spawns 128 routines which is too much for tests + bitswap.EngineTaskWorkerCount(2), + bitswap.TaskWorkerCount(2), + bitswap.SetSimulateDontHavesOnTimeout(false), + bitswap.SetSendDontHaves(false), + ) + nd := &fullNode{ + node: node{ + net: dn, + BlockService: blockservice.New(dagbs, bs), + Host: hst, + }, + EDSStore: *dagbs, + } + dn.nodes = append(dn.nodes, &nd.node) + return nd +} + // ConnectAll connects all the peers on registered on the dagNet. func (dn *dagNet) ConnectAll() { err := dn.net.LinkAll() @@ -222,8 +278,8 @@ func (sn *subNet) LightNode() *node { func (sn *subNet) FullNode() *node { nd := sn.dagNet.FullNode() - sn.nodes = append(sn.nodes, nd) - return nd + sn.nodes = append(sn.nodes, &nd.node) + return &nd.node } func (sn *subNet) ConnectAll() { @@ -272,9 +328,9 @@ func TestLightAvailability(bServ blockservice.BlockService) *LightAvailability { return NewLightAvailability(bServ, disc) } -func TestFullAvailability(bServ blockservice.BlockService) *FullAvailability { +func TestFullAvailability(bServ blockservice.BlockService, edsStr *edsstore.EDSStore) *FullAvailability { disc := NewDiscovery(nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) - return NewFullAvailability(bServ, disc) + return NewFullAvailability(bServ, edsStr, disc) } type TestSuccessfulAvailability struct {