From e9b49f8638618424b3251351558a44c9a79aeed6 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 30 Sep 2021 13:10:18 +0200 Subject: [PATCH] first draft block store --- ipld/read_test.go | 4 +-- ipld/write.go | 6 ++-- service/block/event.go | 1 + service/block/event_test.go | 58 +++++++++++++++++++++++-------------- service/block/interface.go | 5 ++++ service/block/service.go | 5 +++- service/block/store.go | 21 ++++++++++++++ service/block/store_test.go | 30 +++++++++++++++++++ service/block/types.go | 5 ++++ 9 files changed, 109 insertions(+), 26 deletions(-) create mode 100644 service/block/store.go create mode 100644 service/block/store_test.go diff --git a/ipld/read_test.go b/ipld/read_test.go index c8a7061e78..2c0518fd45 100644 --- a/ipld/read_test.go +++ b/ipld/read_test.go @@ -127,7 +127,7 @@ func TestRetrieveBlockData(t *testing.T) { // // generate EDS eds := generateRandEDS(t, tc.squareSize) - shares := convertEDStoShares(eds) + shares := ConvertEDStoShares(eds) in, err := PutData(ctx, shares, dag) require.NoError(t, err) @@ -158,7 +158,7 @@ func Test_ConvertEDStoShares(t *testing.T) { eds, err := rsmt2d.ComputeExtendedDataSquare(rawshares, rsmt2d.NewRSGF8Codec(), tree.Constructor) require.NoError(t, err) - resshares := convertEDStoShares(eds) + resshares := ConvertEDStoShares(eds) require.Equal(t, rawshares, resshares) } diff --git a/ipld/write.go b/ipld/write.go index 37046b24ca..e4a58a52ad 100644 --- a/ipld/write.go +++ b/ipld/write.go @@ -21,7 +21,6 @@ func PutData(ctx context.Context, shares [][]byte, adder ipld.NodeAdder) (*rsmt2 batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, adder)) // create the nmt wrapper to generate row and col commitments squareSize := uint32(math.Sqrt(float64(len(shares)))) - fmt.Println("square size when put data", squareSize) tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(batchAdder.Visit)) // recompute the eds eds, err := rsmt2d.ComputeExtendedDataSquare(shares, rsmt2d.NewRSGF8Codec(), tree.Constructor) @@ -34,7 +33,10 @@ func PutData(ctx context.Context, shares [][]byte, adder ipld.NodeAdder) (*rsmt2 return eds, batchAdder.Commit() } -func convertEDStoShares(eds *rsmt2d.ExtendedDataSquare) [][]byte { +// ConvertEDStoShares converts an ExtendedDataSquare back into its original shares. This +// is a helper function for circumstances where PutData must be used after the EDS has already +// been generated. +func ConvertEDStoShares(eds *rsmt2d.ExtendedDataSquare) [][]byte { origWidth := eds.Width() / 2 origShares := make([][]byte, origWidth*origWidth) for i := uint(0); i < origWidth; i++ { diff --git a/service/block/event.go b/service/block/event.go index e358ead34a..0f2e9e68b3 100644 --- a/service/block/event.go +++ b/service/block/event.go @@ -73,5 +73,6 @@ func (s *Service) handleRawBlock(raw *RawBlock) error { return err } // TODO @renaynay: store extended block + return nil } diff --git a/service/block/event_test.go b/service/block/event_test.go index c667a56a6c..b38432ea12 100644 --- a/service/block/event_test.go +++ b/service/block/event_test.go @@ -5,15 +5,17 @@ import ( "testing" "github.com/celestiaorg/celestia-core/testutils" - "github.com/celestiaorg/celestia-node/service/header" + md "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/service/header" ) func Test_listenForNewBlocks(t *testing.T) { mockFetcher := &mockFetcher{ mockNewBlockCh: make(chan *RawBlock), } - serv := NewBlockService(mockFetcher) + serv := NewBlockService(mockFetcher, md.Mock()) // TODO @renaynay: add mock dag service ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -46,26 +48,40 @@ func (m *mockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error { } func (m *mockFetcher) generateBlocks(t *testing.T, num int) { + t.Helper() + for i := 0; i < num; i++ { - data, err := testutils.GenerateRandomBlockData(1, 1, 1, 1, 40) - if err != nil { - t.Fatal(err) - } - rawBlock := &RawBlock{ - Data: data, - } - // extend the data to get the data hash - extendedData, err := extendBlockData(rawBlock) - if err != nil { - t.Fatal(err) - } - dah, err := header.DataAvailabilityHeaderFromExtendedData(extendedData) - if err != nil { - t.Fatal(err) - } - rawBlock.Header = header.RawHeader{ - DataHash: dah.Hash(), - } + rawBlock, _ := generateRawAndExtendedBlock(t) m.mockNewBlockCh <- rawBlock } } + +func generateRawAndExtendedBlock(t *testing.T) (*RawBlock, *Block) { + t.Helper() + + data, err := testutils.GenerateRandomBlockData(1, 1, 1, 1, 40) + if err != nil { + t.Fatal(err) + } + rawBlock := &RawBlock{ + Data: data, + } + // extend the data to get the data hash + extendedData, err := extendBlockData(rawBlock) + if err != nil { + t.Fatal(err) + } + dah, err := header.DataAvailabilityHeaderFromExtendedData(extendedData) + if err != nil { + t.Fatal(err) + } + rawBlock.Header = header.RawHeader{ + DataHash: dah.Hash(), + } + return rawBlock, &Block{ + header: &header.ExtendedHeader{ + DAH: &dah, + }, + data: extendedData, + } +} diff --git a/service/block/interface.go b/service/block/interface.go index ad8917e24d..c29b0e8a59 100644 --- a/service/block/interface.go +++ b/service/block/interface.go @@ -10,3 +10,8 @@ type Fetcher interface { SubscribeNewBlockEvent(ctx context.Context) (<-chan *RawBlock, error) UnsubscribeNewBlockEvent(ctx context.Context) error } + +type Store interface { + GetBlockByHash(hash []byte) (*Block, error) + Store(block *Block) error +} diff --git a/service/block/service.go b/service/block/service.go index 04f33e8cde..c2ea327e18 100644 --- a/service/block/service.go +++ b/service/block/service.go @@ -3,6 +3,7 @@ package block import ( "context" + ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" ) @@ -14,14 +15,16 @@ import ( // 4. Serving erasure coded blocks to other `Full` node peers. type Service struct { fetcher Fetcher + store ipld.DAGService } var log = logging.Logger("block-service") // NewBlockService creates a new instance of block Service. -func NewBlockService(fetcher Fetcher) *Service { +func NewBlockService(fetcher Fetcher, store ipld.DAGService) *Service { return &Service{ fetcher: fetcher, + store: store, } } diff --git a/service/block/store.go b/service/block/store.go new file mode 100644 index 0000000000..7da1ff32be --- /dev/null +++ b/service/block/store.go @@ -0,0 +1,21 @@ +package block + +import ( + "context" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-core/pkg/da" + "github.com/celestiaorg/celestia-node/ipld" +) + +func (s *Service) StoreBlockData(ctx context.Context, data *ExtendedBlockData) error { + shares := ipld.ConvertEDStoShares(data) + // TODO @renaynay: it's inefficient that we generate the EDS twice + _, err := ipld.PutData(ctx, shares, s.store) + return err +} + +func (s *Service) GetBlockData(ctx context.Context, dah *da.DataAvailabilityHeader) (*ExtendedBlockData, error) { + return ipld.RetrieveData(ctx, dah, s.store, rsmt2d.NewRSGF8Codec()) +} diff --git a/service/block/store_test.go b/service/block/store_test.go new file mode 100644 index 0000000000..a42132a879 --- /dev/null +++ b/service/block/store_test.go @@ -0,0 +1,30 @@ +package block + +import ( + "context" + "testing" + + md "github.com/ipfs/go-merkledag/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestService_BlockStore(t *testing.T) { + // create mock block service (fetcher is not necessary here) + mockStore := md.Mock() + serv := NewBlockService(nil, mockStore) + + _, block := generateRawAndExtendedBlock(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := serv.StoreBlockData(ctx, block.Data()) + require.NoError(t, err) + + eds, err := serv.GetBlockData(ctx, block.Header().DAH) + require.NoError(t, err) + assert.Equal(t, block.data.Width(), eds.Width()) + assert.Equal(t, block.data.RowRoots(), eds.RowRoots()) + assert.Equal(t, block.data.ColRoots(), eds.ColRoots()) +} diff --git a/service/block/types.go b/service/block/types.go index c8361b1541..a0847f9f4c 100644 --- a/service/block/types.go +++ b/service/block/types.go @@ -38,6 +38,11 @@ func (b *Block) LastCommit() *core.Commit { return b.lastCommit } +// DataSize returns the size of the ExtendedBlockData. +func (b *Block) DataSize() uint { + return b.data.Width() +} + // BadEncodingError contains all relevant information to // generate a BadEncodingFraudProof. type BadEncodingError struct{}