Skip to content

Commit

Permalink
feat: Refactor indexes to put storage considerations on consumers
Browse files Browse the repository at this point in the history
There is no way I can make a safe implementation of the parser by slurping
thing into memory, indexes people use are just too big.

So I made a new API which force consumers to manage that.
They can choose to use a bytes.Reader, *os.File, mmaped thing, ...
  • Loading branch information
Jorropo authored and masih committed Jul 6, 2022
1 parent 32c1008 commit 3923d31
Show file tree
Hide file tree
Showing 18 changed files with 549 additions and 111 deletions.
24 changes: 24 additions & 0 deletions v2/blockstore/insertionindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/index"
internalio "github.com/ipld/go-car/v2/internal/io"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/petar/GoLLRB/llrb"
Expand Down Expand Up @@ -121,6 +122,20 @@ func (ii *insertionIndex) Marshal(w io.Writer) (uint64, error) {
return l, err
}

func (ii *insertionIndex) ForEach(f func(multihash.Multihash, uint64) error) error {
var errr error
ii.items.AscendGreaterOrEqual(ii.items.Min(), func(i llrb.Item) bool {
r := i.(recordDigest).Record
err := f(r.Cid.Hash(), r.Offset)
if err != nil {
errr = err
return false
}
return true
})
return errr
}

func (ii *insertionIndex) Unmarshal(r io.Reader) error {
var length int64
if err := binary.Read(r, binary.LittleEndian, &length); err != nil {
Expand All @@ -137,6 +152,15 @@ func (ii *insertionIndex) Unmarshal(r io.Reader) error {
return nil
}

func (ii *insertionIndex) UnmarshalLazyRead(r io.ReaderAt) (int64, error) {
rdr := internalio.NewOffsetReadSeeker(r, 0)
err := ii.Unmarshal(rdr)
if err != nil {
return 0, err
}
return rdr.Seek(0, io.SeekCurrent)
}

func (ii *insertionIndex) Codec() multicodec.Code {
return insertionIndexCodec
}
Expand Down
13 changes: 10 additions & 3 deletions v2/blockstore/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,11 @@ func OpenReadOnly(path string, opts ...carv2.Option) (*ReadOnly, error) {
}

func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
bcid, data, err := util.ReadNode(internalio.NewOffsetReadSeeker(b.backing, idx), b.opts.ZeroLengthSectionAsEOF)
return bcid, data, err
r, err := internalio.NewOffsetReadSeekerWithError(b.backing, idx)
if err != nil {
return cid.Cid{}, nil, err
}
return util.ReadNode(r, b.opts.ZeroLengthSectionAsEOF)
}

// DeleteBlock is unsupported and always errors.
Expand Down Expand Up @@ -441,7 +444,11 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
}
}

thisItemForNxt := rdr.Offset()
thisItemForNxt, err := rdr.Seek(0, io.SeekCurrent)
if err != nil {
maybeReportError(ctx, err)
return
}
_, c, err := cid.CidFromReader(rdr)
if err != nil {
maybeReportError(ctx, err)
Expand Down
23 changes: 15 additions & 8 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ func AllowDuplicatePuts(allow bool) carv2.Option {
// successfully. On resumption the roots argument and WithDataPadding option must match the
// previous instantiation of ReadWrite blockstore that created the file. More explicitly, the file
// resuming from must:
// 1. start with a complete CARv2 car.Pragma.
// 2. contain a complete CARv1 data header with root CIDs matching the CIDs passed to the
// constructor, starting at offset optionally padded by WithDataPadding, followed by zero or
// more complete data sections. If any corrupt data sections are present the resumption will fail.
// Note, if set previously, the blockstore must use the same WithDataPadding option as before,
// since this option is used to locate the CARv1 data payload.
// 1. start with a complete CARv2 car.Pragma.
// 2. contain a complete CARv1 data header with root CIDs matching the CIDs passed to the
// constructor, starting at offset optionally padded by WithDataPadding, followed by zero or
// more complete data sections. If any corrupt data sections are present the resumption will fail.
// Note, if set previously, the blockstore must use the same WithDataPadding option as before,
// since this option is used to locate the CARv1 data payload.
//
// Note, resumption should be used with WithCidDeduplication, so that blocks that are successfully
// written into the file are not re-written. Unless, the user explicitly wants duplicate blocks.
Expand Down Expand Up @@ -139,7 +139,10 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.Option) (*ReadWri
offset = 0
}
rwbs.dataWriter = internalio.NewOffsetWriter(rwbs.f, offset)
v1r := internalio.NewOffsetReadSeeker(rwbs.f, offset)
v1r, err := internalio.NewOffsetReadSeekerWithError(rwbs.f, offset)
if err != nil {
return nil, err
}
rwbs.ronly.backing = v1r
rwbs.ronly.idx = rwbs.idx
rwbs.ronly.carv2Closer = rwbs.f
Expand Down Expand Up @@ -190,7 +193,11 @@ func (b *ReadWrite) resumeWithRoots(v2 bool, roots []cid.Cid) error {
// Check if file was finalized by trying to read the CARv2 header.
// We check because if finalized the CARv1 reader behaviour needs to be adjusted since
// EOF will not signify end of CARv1 payload. i.e. index is most likely present.
_, err = headerInFile.ReadFrom(internalio.NewOffsetReadSeeker(b.f, carv2.PragmaSize))
r, err := internalio.NewOffsetReadSeekerWithError(b.f, carv2.PragmaSize)
if err != nil {
return err
}
_, err = headerInFile.ReadFrom(r)

// If reading CARv2 header succeeded, and CARv1 offset in header is not zero then the file is
// most-likely finalized. Check padding and truncate the file to remove index.
Expand Down
23 changes: 9 additions & 14 deletions v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,20 +842,15 @@ func TestOpenReadWrite_WritesIdentityCIDsWhenOptionIsEnabled(t *testing.T) {
expectedOffset := len(object) + 1

// Assert index is iterable and has exactly one record with expected multihash and offset.
switch idx := gotIdx.(type) {
case index.IterableIndex:
var i int
err := idx.ForEach(func(mh multihash.Multihash, offset uint64) error {
i++
require.Equal(t, idmh, mh)
require.Equal(t, uint64(expectedOffset), offset)
return nil
})
require.NoError(t, err)
require.Equal(t, 1, i)
default:
require.Failf(t, "unexpected index type", "wanted %v but got %v", multicodec.CarMultihashIndexSorted, idx.Codec())
}
var count int
err = gotIdx.ForEach(func(mh multihash.Multihash, offset uint64) error {
count++
require.Equal(t, idmh, mh)
require.Equal(t, uint64(expectedOffset), offset)
return nil
})
require.NoError(t, err)
require.Equal(t, 1, count)
}

func TestOpenReadWrite_ErrorsWhenWritingTooLargeOfACid(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions v2/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,15 @@ func FuzzIndex(f *testing.F) {
if err != nil {
return
}
index := subject.IndexReader()
if index == nil {
indexRdr := subject.IndexReader()
if indexRdr == nil {
return
}
data, err := io.ReadAll(index)
_, n, err := index.ReadFromWithSize(indexRdr)
if err != nil {
return
}
data, err := io.ReadAll(io.NewSectionReader(indexRdr, 0, n))
if err != nil {
f.Fatal(err)
}
Expand Down
50 changes: 31 additions & 19 deletions v2/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ type (
// Marshal encodes the index in serial form.
Marshal(w io.Writer) (uint64, error)
// Unmarshal decodes the index from its serial form.
// Deprecated: This function is slurpy and will copy everything into memory.
Unmarshal(r io.Reader) error

// UnmarshalLazyRead is the safe alternative to to Unmarshal.
// Instead of slurping it will keep a reference to the the io.ReaderAt passed in
// and ask for data as needed.
UnmarshalLazyRead(r io.ReaderAt) (indexSize int64, err error)

// Load inserts a number of records into the index.
// Note that Index will load all given records. Any filtering of the records such as
// exclusion of CIDs with multihash.IDENTITY code must occur prior to calling this function.
Expand All @@ -66,18 +72,6 @@ type (
// meaning that no callbacks happen,
// ErrNotFound is returned.
GetAll(cid.Cid, func(uint64) bool) error
}

// IterableIndex extends Index in cases where the Index is able to
// provide an iterator for getting the list of all multihashes in the
// index.
//
// Note that it is possible for an index to contain multiple offsets for
// a given multihash.
//
// See: IterableIndex.ForEach, Index.GetAll.
IterableIndex interface {
Index

// ForEach takes a callback function that will be called
// on each entry in the index. The arguments to the callback are
Expand All @@ -93,6 +87,12 @@ type (
// The order of calls to the given function is deterministic, but entirely index-specific.
ForEach(func(multihash.Multihash, uint64) error) error
}

// IterableIndex is an index which support iterating over it's elements
// Deprecated: IterableIndex has been moved into Index. Just use Index now.
IterableIndex interface {
Index
}
)

// GetFirst is a wrapper over Index.GetAll, returning the offset for the first
Expand Down Expand Up @@ -136,18 +136,30 @@ func WriteTo(idx Index, w io.Writer) (uint64, error) {
// ReadFrom reads index from r.
// The reader decodes the index by reading the first byte to interpret the encoding.
// Returns error if the encoding is not known.
func ReadFrom(r io.Reader) (Index, error) {
code, err := varint.ReadUvarint(internalio.ToByteReader(r))
func ReadFrom(r io.ReaderAt) (Index, error) {
idx, _, err := ReadFromWithSize(r)
return idx, err
}

// ReadFromWithSize is just like ReadFrom but return the size of the Index.
// The size is only valid when err != nil.
func ReadFromWithSize(r io.ReaderAt) (Index, int64, error) {
code, err := varint.ReadUvarint(internalio.NewOffsetReadSeeker(r, 0))
if err != nil {
return nil, err
return nil, 0, err
}
codec := multicodec.Code(code)
idx, err := New(codec)
if err != nil {
return nil, err
return nil, 0, err
}
rdr, err := internalio.NewOffsetReadSeekerWithError(r, int64(varint.UvarintSize(code)))
if err != nil {
return nil, 0, err
}
if err := idx.Unmarshal(r); err != nil {
return nil, err
n, err := idx.UnmarshalLazyRead(rdr)
if err != nil {
return nil, 0, err
}
return idx, nil
return idx, n, nil
}
Loading

0 comments on commit 3923d31

Please sign in to comment.