Skip to content
This repository was archived by the owner on Oct 5, 2023. It is now read-only.

Commit f34a5f6

Browse files
committed
Reimplement DAG as DAGService
1 parent 163b25f commit f34a5f6

File tree

3 files changed

+77
-37
lines changed

3 files changed

+77
-37
lines changed

api.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
"github.com/ipfs/go-ipfs/core/coreapi/interface"
1212
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
13+
14+
"github.com/ipfs/go-ipld-format"
1315
homedir "github.com/mitchellh/go-homedir"
1416
ma "github.com/multiformats/go-multiaddr"
1517
manet "github.com/multiformats/go-multiaddr-net"
@@ -137,8 +139,8 @@ func (api *HttpApi) Block() iface.BlockAPI {
137139
return (*BlockAPI)(api)
138140
}
139141

140-
func (api *HttpApi) Dag() iface.DagAPI {
141-
return (*DagAPI)(api)
142+
func (api *HttpApi) Dag() format.DAGService {
143+
return (*HttpDagServ)(api)
142144
}
143145

144146
func (api *HttpApi) Name() iface.NameAPI {

apifile.go

+5
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ func (it *apiIter) Name() string {
125125
}
126126

127127
func (it *apiIter) Next() bool {
128+
if it.ctx.Err() != nil {
129+
it.err = it.ctx.Err()
130+
return false
131+
}
132+
128133
var out lsOutput
129134
if err := it.dec.Decode(&out); err != nil {
130135
if err != io.EOF {

dag.go

+68-35
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,104 @@
11
package httpapi
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
6-
"io"
7-
"math"
7+
"io/ioutil"
8+
"sync"
89

910
"github.com/ipfs/go-ipfs/core/coreapi/interface"
10-
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
11+
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
1112

13+
"github.com/ipfs/go-block-format"
1214
"github.com/ipfs/go-cid"
1315
"github.com/ipfs/go-ipld-format"
14-
mh "github.com/multiformats/go-multihash"
1516
)
1617

17-
type DagAPI HttpApi
18+
type HttpDagServ HttpApi
1819

19-
func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (iface.ResolvedPath, error) {
20-
options, err := caopts.DagPutOptions(opts...)
20+
func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
21+
r, err := api.core().Block().Get(ctx, iface.IpldPath(c))
2122
if err != nil {
2223
return nil, err
2324
}
2425

25-
codec, ok := cid.CodecToStr[options.Codec]
26-
if !ok {
27-
return nil, fmt.Errorf("unknowm codec %d", options.MhType)
26+
data, err := ioutil.ReadAll(r)
27+
if err != nil {
28+
return nil, err
2829
}
2930

30-
if options.MhLength != -1 {
31-
return nil, fmt.Errorf("setting hash len is not supported yet")
31+
blk, err := blocks.NewBlockWithCid(data, c)
32+
if err != nil {
33+
return nil, err
3234
}
3335

34-
var out struct {
35-
Cid cid.Cid
36+
return format.DefaultBlockDecoder.Decode(blk)
37+
}
38+
39+
func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption {
40+
out := make(chan *format.NodeOption)
41+
wg := sync.WaitGroup{}
42+
wg.Add(len(cids))
43+
44+
for _, c := range cids {
45+
// TODO: Consider limiting concurrency of this somehow
46+
go func() {
47+
defer wg.Done()
48+
n, err := api.Get(ctx, c)
49+
50+
select {
51+
case out <- &format.NodeOption{Node: n, Err: err}:
52+
case <-ctx.Done():
53+
}
54+
}()
3655
}
37-
req := api.core().request("dag/put").
38-
Option("format", codec).
39-
Option("input-enc", options.InputEnc)
40-
41-
if options.MhType != math.MaxUint64 {
42-
mht, ok := mh.Codes[options.MhType]
43-
if !ok {
44-
return nil, fmt.Errorf("unknowm mhType %d", options.MhType)
45-
}
46-
req.Option("hash", mht)
56+
return out
57+
}
58+
59+
func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error {
60+
c := nd.Cid()
61+
prefix := c.Prefix()
62+
format := cid.CodecToStr[prefix.Codec]
63+
if prefix.Version == 0 {
64+
format = "v0"
4765
}
4866

49-
err = req.FileBody(src).Exec(ctx, &out)
67+
stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()),
68+
options.Block.Hash(prefix.MhType, prefix.MhLength), options.Block.Format(format))
5069
if err != nil {
51-
return nil, err
70+
return err
5271
}
53-
54-
return iface.IpldPath(out.Cid), nil
72+
if !stat.Path().Cid().Equals(c) {
73+
return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().Cid().String())
74+
}
75+
return nil
5576
}
5677

57-
func (api *DagAPI) Get(ctx context.Context, path iface.Path) (format.Node, error) {
58-
panic("implement me")
78+
func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error {
79+
for _, nd := range nds {
80+
// TODO: optimize
81+
if err := api.Add(ctx, nd); err != nil {
82+
return err
83+
}
84+
}
85+
return nil
5986
}
6087

61-
func (api *DagAPI) Tree(ctx context.Context, path iface.Path, opts ...caopts.DagTreeOption) ([]iface.Path, error) {
62-
panic("implement me")
88+
func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error {
89+
return api.core().Block().Rm(ctx, iface.IpldPath(c)) //TODO: should we force rm?
6390
}
6491

65-
func (api *DagAPI) Batch(ctx context.Context) iface.DagBatch {
66-
panic("implement me")
92+
func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
93+
for _, c := range cids {
94+
// TODO: optimize
95+
if err := api.Remove(ctx, c); err != nil {
96+
return err
97+
}
98+
}
99+
return nil
67100
}
68101

69-
func (api *DagAPI) core() *HttpApi {
102+
func (api *HttpDagServ) core() *HttpApi {
70103
return (*HttpApi)(api)
71104
}

0 commit comments

Comments
 (0)