diff --git a/hamt/hamt.go b/hamt/hamt.go index 0b474289b..3714c30a2 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -24,7 +24,6 @@ import ( "context" "fmt" "os" - "sync" bitfield "github.com/Stebalien/go-bitfield" cid "github.com/ipfs/go-cid" @@ -400,21 +399,16 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func // EnumLinks collects all links in the Shard. func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) { var links []*ipld.Link - var setlk sync.Mutex - getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *Shard) error { - lnk := sv.val - lnk.Name = sv.key - setlk.Lock() - links = append(links, lnk) - setlk.Unlock() - return nil - }) - - cset := cid.NewSet() + linkResults := ds.EnumLinksAsync(ctx) - err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit) - return links, err + for linkResult := range linkResults { + if linkResult.Err != nil { + return links, linkResult.Err + } + links = append(links, linkResult.Link) + } + return links, nil } // ForEachLink walks the Shard and calls the given function. @@ -427,10 +421,28 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro }) } +// EnumLinksAsync returns a channel which will receive Links in the directory +// as they are enumerated, where order is not gauranteed +func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { + linkResults := make(chan format.LinkResult) + ctx, cancel := context.WithCancel(ctx) + go func() { + defer close(linkResults) + defer cancel() + getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults) + cset := cid.NewSet() + err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit) + if err != nil { + emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) + } + }() + return linkResults +} + // makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync // to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called // on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation -func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *Shard) error) dag.GetLinks { +func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks { return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { node, err := dagService.Get(ctx, currentCid) @@ -458,16 +470,31 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard * if err != nil { return nil, err } - err = onShardValue(sv) - if err != nil { - return nil, err - } + formattedLink := sv.val + formattedLink.Name = sv.key + emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) } } return childShards, nil } } +func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) { + // make sure that context cancel is processed first + // the reason is due to the concurrency of EnumerateChildrenAsync + // it's possible for EnumLinksAsync to complete and close the linkResults + // channel before this code runs + select { + case <-ctx.Done(): + return + default: + } + select { + case linkResults <- r: + case <-ctx.Done(): + } +} + func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error { for idx := range ds.children { c, err := ds.getChild(ctx, idx) diff --git a/hamt/hamt_test.go b/hamt/hamt_test.go index ffbb676eb..1483fcd9f 100644 --- a/hamt/hamt_test.go +++ b/hamt/hamt_test.go @@ -74,28 +74,7 @@ func assertLink(s *Shard, name string, found bool) error { } } -func assertSerializationWorks(ds ipld.DAGService, s *Shard) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - nd, err := s.Node() - if err != nil { - return err - } - - nds, err := NewHamtFromDag(ds, nd) - if err != nil { - return err - } - - linksA, err := s.EnumLinks(ctx) - if err != nil { - return err - } - - linksB, err := nds.EnumLinks(ctx) - if err != nil { - return err - } +func assertLinksEqual(linksA []*ipld.Link, linksB []*ipld.Link) error { if len(linksA) != len(linksB) { return fmt.Errorf("links arrays are different sizes") @@ -121,6 +100,32 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error { return nil } +func assertSerializationWorks(ds ipld.DAGService, s *Shard) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nd, err := s.Node() + if err != nil { + return err + } + + nds, err := NewHamtFromDag(ds, nd) + if err != nil { + return err + } + + linksA, err := s.EnumLinks(ctx) + if err != nil { + return err + } + + linksB, err := nds.EnumLinks(ctx) + if err != nil { + return err + } + + return assertLinksEqual(linksA, linksB) +} + func TestBasicSet(t *testing.T) { ds := mdtest.Mock() for _, w := range []int{128, 256, 512, 1024, 2048, 4096} { @@ -309,6 +314,46 @@ func TestSetAfterMarshal(t *testing.T) { } } +func TestEnumLinksAsync(t *testing.T) { + ds := mdtest.Mock() + _, s, err := makeDir(ds, 300) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + nd, err := s.Node() + if err != nil { + t.Fatal(err) + } + + nds, err := NewHamtFromDag(ds, nd) + if err != nil { + t.Fatal(err) + } + + linksA, err := nds.EnumLinks(ctx) + if err != nil { + t.Fatal(err) + } + + linkResults := nds.EnumLinksAsync(ctx) + + var linksB []*ipld.Link + + for linkResult := range linkResults { + if linkResult.Err != nil { + t.Fatal(linkResult.Err) + } + linksB = append(linksB, linkResult.Link) + } + + err = assertLinksEqual(linksA, linksB) + if err != nil { + t.Fatal(err) + } +} + func TestDuplicateAddShard(t *testing.T) { ds := mdtest.Mock() dir, _ := NewShard(ds, 256) diff --git a/io/directory.go b/io/directory.go index aa1ec8de7..2e0227623 100644 --- a/io/directory.go +++ b/io/directory.go @@ -6,6 +6,7 @@ import ( "os" mdag "github.com/ipfs/go-merkledag" + format "github.com/ipfs/go-unixfs" hamt "github.com/ipfs/go-unixfs/hamt" @@ -38,6 +39,10 @@ type Directory interface { // ForEachLink applies the given function to Links in the directory. ForEachLink(context.Context, func(*ipld.Link) error) error + // EnumLinksAsync returns a channel which will receive Links in the directory + // as they are enumerated, where order is not gauranteed + EnumLinksAsync(context.Context) <-chan format.LinkResult + // Links returns the all the links in the directory node. Links(context.Context) ([]*ipld.Link, error) @@ -141,6 +146,26 @@ func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.No return d.node.AddNodeLink(name, node) } +// EnumLinksAsync returns a channel which will receive Links in the directory +// as they are enumerated, where order is not gauranteed +func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { + linkResults := make(chan format.LinkResult) + go func() { + defer close(linkResults) + for _, l := range d.node.Links() { + select { + case linkResults <- format.LinkResult{ + Link: l, + Err: nil, + }: + case <-ctx.Done(): + return + } + } + }() + return linkResults +} + // ForEachLink implements the `Directory` interface. func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error { for _, l := range d.node.Links() { @@ -226,6 +251,12 @@ func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) erro return d.shard.ForEachLink(ctx, f) } +// EnumLinksAsync returns a channel which will receive Links in the directory +// as they are enumerated, where order is not gauranteed +func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { + return d.shard.EnumLinksAsync(ctx) +} + // Links implements the `Directory` interface. func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) { return d.shard.EnumLinks(ctx) diff --git a/io/directory_test.go b/io/directory_test.go index 64a1ef2c6..12c481753 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -5,7 +5,9 @@ import ( "fmt" "testing" + ipld "github.com/ipfs/go-ipld-format" mdtest "github.com/ipfs/go-merkledag/test" + ft "github.com/ipfs/go-unixfs" ) @@ -155,4 +157,28 @@ func TestDirBuilder(t *testing.T) { if len(links) != count { t.Fatal("wrong number of links", len(links), count) } + + linkResults := dir.EnumLinksAsync(ctx) + + asyncNames := make(map[string]bool) + var asyncLinks []*ipld.Link + + for linkResult := range linkResults { + if linkResult.Err != nil { + t.Fatal(linkResult.Err) + } + asyncNames[linkResult.Link.Name] = true + asyncLinks = append(asyncLinks, linkResult.Link) + } + + for i := 0; i < count; i++ { + n := fmt.Sprintf("entry %d", i) + if !asyncNames[n] { + t.Fatal("COULDNT FIND: ", n) + } + } + + if len(asyncLinks) != count { + t.Fatal("wrong number of links", len(asyncLinks), count) + } } diff --git a/unixfs.go b/unixfs.go index 7b4189153..4ee755186 100644 --- a/unixfs.go +++ b/unixfs.go @@ -9,9 +9,18 @@ import ( proto "github.com/gogo/protobuf/proto" dag "github.com/ipfs/go-merkledag" + + ipld "github.com/ipfs/go-ipld-format" pb "github.com/ipfs/go-unixfs/pb" ) +// A LinkResult for any parallel enumeration of links +// TODO: Should this live in go-ipld-format? +type LinkResult struct { + Link *ipld.Link + Err error +} + // Shorthands for protobuffer types const ( TRaw = pb.Data_Raw