Skip to content
This repository was archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
blockstore: Adding Stat method to map from Cid to BlockSize
Browse files Browse the repository at this point in the history
Performant way to map from Cid to BlockSize. 

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
taylormike committed Jul 11, 2018
1 parent fe4126e commit 3e9f68b
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 24 deletions.
50 changes: 29 additions & 21 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
}

func (b *arccache) DeleteBlock(k *cid.Cid) error {
if has, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
switch err {
case nil, ds.ErrNotFound, ErrNotFound:
b.addCache(k, false)
b.addCache(k, 0)
return err
default:
return err
Expand All @@ -51,33 +51,41 @@ func (b *arccache) DeleteBlock(k *cid.Cid) error {

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k *cid.Cid) (has bool, ok bool) {
func (b *arccache) hasCached(k *cid.Cid) (has bool, size uint, ok bool) {
b.total.Inc()
if k == nil {
log.Error("nil cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, false
return false, 0, false
}

h, ok := b.arc.Get(k.KeyString())
if ok {
b.hits.Inc()
return h.(bool), true
if h.(uint) > 0 {
return true, h.(uint), true
} else {
return false, h.(uint), true
}
}
return false, false
return false, 0, false
}

func (b *arccache) Has(k *cid.Cid) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}
blockSize, err := b.Stat(k)
return blockSize > 0, err
}

res, err := b.blockstore.Has(k)
func (b *arccache) Stat(k *cid.Cid) (uint, error) {
if _, blockSize, ok := b.hasCached(k); ok {
return blockSize, nil
}
blockSize, err := b.blockstore.Stat(k)
if err == nil {
b.addCache(k, res)
b.addCache(k, blockSize)
}
return res, err
return blockSize, err
}

func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) {
Expand All @@ -86,27 +94,27 @@ func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

if has, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.addCache(k, false)
b.addCache(k, 0)
} else if bl != nil {
b.addCache(k, true)
b.addCache(k, uint(len(bl.RawData())))
}
return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Cid()); ok && has {
if has, _, ok := b.hasCached(bl.Cid()); ok && has {
return nil
}

err := b.blockstore.Put(bl)
if err == nil {
b.addCache(bl.Cid(), true)
b.addCache(bl.Cid(), uint(len(bl.RawData())))
}
return err
}
Expand All @@ -116,7 +124,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
}
}
Expand All @@ -125,7 +133,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
return err
}
for _, block := range good {
b.addCache(block.Cid(), true)
b.addCache(block.Cid(), uint(len(block.RawData())))
}
return nil
}
Expand All @@ -134,8 +142,8 @@ func (b *arccache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}

func (b *arccache) addCache(c *cid.Cid, has bool) {
b.arc.Add(c.KeyString(), has)
func (b *arccache) addCache(c *cid.Cid, blockSize uint) {
b.arc.Add(c.KeyString(), blockSize)
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
Expand Down
21 changes: 20 additions & 1 deletion arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"testing"

"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -107,6 +107,9 @@ func TestGetFillsCache(t *testing.T) {
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
if blockSize, err := arc.Stat(exampleBlock.Cid()); blockSize > 0 || err != nil {
t.Fatal("stat was true but there is no such block")
}

untrap(cd)

Expand All @@ -119,12 +122,16 @@ func TestGetFillsCache(t *testing.T) {
if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
t.Fatal("has returned invalid result")
}
if blockSize, err := arc.Stat(exampleBlock.Cid()); blockSize == 0 || err != nil {
t.Fatal("stat returned invalid result")
}
}

func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
arc, _, cd := createStores(t)

arc.Has(exampleBlock.Cid())
arc.Stat(exampleBlock.Cid())

trap("get hit datastore", cd, t)

Expand Down Expand Up @@ -167,6 +174,17 @@ func TestHasAfterSucessfulGetIsCached(t *testing.T) {
arc.Has(exampleBlock.Cid())
}

func TestStatAfterSucessfulGetIsCached(t *testing.T) {
arc, bs, cd := createStores(t)

bs.Put(exampleBlock)

arc.Get(exampleBlock.Cid())

trap("has hit datastore", cd, t)
arc.Stat(exampleBlock.Cid())
}

func TestDifferentKeyObjectsWork(t *testing.T) {
arc, bs, cd := createStores(t)

Expand All @@ -191,6 +209,7 @@ func TestPutManyCaches(t *testing.T) {

trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.Stat(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())

Expand Down
18 changes: 18 additions & 0 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Blockstore interface {
Has(*cid.Cid) (bool, error)
Get(*cid.Cid) (blocks.Block, error)

// Stat returns the CIDs mapped BlockSize
Stat(*cid.Cid) (uint, error)

// Put puts a given block to the underlying datastore
Put(blocks.Block) error

Expand Down Expand Up @@ -183,6 +186,21 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
return bs.datastore.Has(dshelp.CidToDsKey(k))
}

func (bs *blockstore) Stat(k *cid.Cid) (uint, error) {
maybeData, err := bs.datastore.Get(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
return 0, nil
}
if err != nil {
return 0, err
}
bdata, ok := maybeData.([]byte)
if !ok {
return 0, ErrValueTypeMismatch
}
return uint(len(bdata)), nil
}

func (bs *blockstore) DeleteBlock(k *cid.Cid) error {
err := bs.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
Expand Down
18 changes: 18 additions & 0 deletions blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ func TestPutThenGetBlock(t *testing.T) {
}
}

func TestPutThenStatBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(block)
if err != nil {
t.Fatal(err)
}

blockSize, err := bs.Stat(block.Cid())
if err != nil {
t.Fatal(err)
}
if uint(len(block.RawData())) != blockSize {
t.Fail()
}
}

func TestHashOnRead(t *testing.T) {
orginalDebug := u.Debug
defer (func() {
Expand Down
4 changes: 4 additions & 0 deletions bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (b *bloomcache) Has(k *cid.Cid) (bool, error) {
return b.blockstore.Has(k)
}

func (b *bloomcache) Stat(k *cid.Cid) (uint, error) {
return b.blockstore.Stat(k)
}

func (b *bloomcache) Get(k *cid.Cid) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
Expand Down
12 changes: 10 additions & 2 deletions bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,23 @@ func TestPutManyAddsToBloom(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !has {
blockSize, err := cachedbs.Stat(block1.Cid())
if err != nil {
t.Fatal(err)
}
if blockSize == 0 || !has {
t.Fatal("added block is reported missing")
}

has, err = cachedbs.Has(block2.Cid())
if err != nil {
t.Fatal(err)
}
if has {
blockSize, err = cachedbs.Stat(block2.Cid())
if err != nil {
t.Fatal(err)
}
if blockSize > 0 || has {
t.Fatal("not added block is reported to be in blockstore")
}
}
Expand Down
8 changes: 8 additions & 0 deletions idstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func (b *idstore) Has(k *cid.Cid) (bool, error) {
return b.bs.Has(k)
}

func (b *idstore) Stat(k *cid.Cid) (uint, error) {
isId, bdata := extractContents(k)
if isId {
return uint(len(bdata)), nil
}
return b.bs.Stat(k)
}

func (b *idstore) Get(k *cid.Cid) (blocks.Block, error) {
isId, bdata := extractContents(k)
if isId {
Expand Down
10 changes: 10 additions & 0 deletions idstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func TestIdStore(t *testing.T) {
t.Fatal("normal block not added to datastore")
}

blockSize, _ := ids.Stat(hash1)
if blockSize == 0 {
t.Fatal("normal block not added to datastore")
}

_, err = ids.Get(hash1)
if err != nil {
t.Fatal(err)
Expand All @@ -78,6 +83,11 @@ func TestIdStore(t *testing.T) {
t.Fatal("normal block not deleted from datastore")
}

blockSize, _ = ids.Stat(hash1)
if blockSize > 0 {
t.Fatal("normal block not deleted from datastore")
}

idhash2, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash2"))
idblock2, _ := blk.NewBlockWithCid([]byte("idhash2"), idhash2)
hash2, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash2"))
Expand Down

0 comments on commit 3e9f68b

Please sign in to comment.