Skip to content

Commit 7c07e11

Browse files
authored
feat: stream car file bytes from @helia/car (#444)
To better support streaming CAR files with a less confusing API, add a method to `@helia/car` that takes root CIDs and returns an AsyncGenerator that yields CAR file bytes.
1 parent 8db7792 commit 7c07e11

File tree

4 files changed

+106
-30
lines changed

4 files changed

+106
-30
lines changed

packages/car/src/index.ts

+40-2
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,14 @@
5858
* ```
5959
*/
6060

61+
import { CarWriter } from '@ipld/car'
6162
import drain from 'it-drain'
6263
import map from 'it-map'
6364
import defer from 'p-defer'
6465
import PQueue from 'p-queue'
6566
import type { DAGWalker } from '@helia/interface'
6667
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
67-
import type { CarReader, CarWriter } from '@ipld/car'
68+
import type { CarReader } from '@ipld/car'
6869
import type { AbortOptions } from '@libp2p/interfaces'
6970
import type { Blockstore } from 'interface-blockstore'
7071
import type { CID } from 'multiformats/cid'
@@ -129,6 +130,28 @@ export interface Car {
129130
* ```
130131
*/
131132
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>
133+
134+
/**
135+
* Returns an AsyncGenerator that yields CAR file bytes.
136+
*
137+
* @example
138+
*
139+
* ```typescript
140+
* import { createHelia } from 'helia'
141+
* import { car } from '@helia/car
142+
* import { CID } from 'multiformats/cid'
143+
*
144+
* const helia = await createHelia()
145+
* const cid = CID.parse('QmFoo...')
146+
*
147+
* const c = car(helia)
148+
*
149+
* for (const buf of c.stream(cid)) {
150+
* // store or send `buf` somewhere
151+
* }
152+
* ```
153+
*/
154+
stream(root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array>
132155
}
133156

134157
const DAG_WALK_QUEUE_CONCURRENCY = 1
@@ -148,7 +171,7 @@ class DefaultCar implements Car {
148171
}
149172

150173
async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
151-
const deferred = defer()
174+
const deferred = defer<Error | undefined>()
152175
const roots = Array.isArray(root) ? root : [root]
153176

154177
// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
@@ -159,6 +182,7 @@ class DefaultCar implements Car {
159182
deferred.resolve()
160183
})
161184
queue.on('error', (err) => {
185+
queue.clear()
162186
deferred.reject(err)
163187
})
164188

@@ -168,6 +192,7 @@ class DefaultCar implements Car {
168192
await writer.put({ cid, bytes })
169193
}, options)
170194
})
195+
.catch(() => {})
171196
}
172197

173198
// wait for the writer to end
@@ -178,6 +203,19 @@ class DefaultCar implements Car {
178203
}
179204
}
180205

206+
async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array, void, undefined> {
207+
const { writer, out } = CarWriter.create(root)
208+
209+
// has to be done async so we write to `writer` and read from `out` at the
210+
// same time
211+
this.export(root, writer, options)
212+
.catch(() => {})
213+
214+
for await (const buf of out) {
215+
yield buf
216+
}
217+
}
218+
181219
/**
182220
* Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore
183221
* and update the pin count for them
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import * as dagPb from '@ipld/dag-pb'
2+
import * as raw from 'multiformats/codecs/raw'
3+
import type { DAGWalker } from '@helia/interface'
4+
5+
/**
6+
* Dag walker for dag-pb CIDs
7+
*/
8+
const dagPbWalker: DAGWalker = {
9+
codec: dagPb.code,
10+
* walk (block) {
11+
const node = dagPb.decode(block)
12+
13+
yield * node.Links.map(l => l.Hash)
14+
}
15+
}
16+
17+
const rawWalker: DAGWalker = {
18+
codec: raw.code,
19+
* walk () {
20+
// no embedded CIDs in a raw block
21+
}
22+
}
23+
24+
export const dagWalkers = {
25+
[dagPb.code]: dagPbWalker,
26+
[raw.code]: rawWalker
27+
}

packages/car/test/index.spec.ts

+2-28
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,23 @@
22

33
import { type UnixFS, unixfs } from '@helia/unixfs'
44
import { CarReader } from '@ipld/car'
5-
import * as dagPb from '@ipld/dag-pb'
65
import { expect } from 'aegir/chai'
76
import { MemoryBlockstore } from 'blockstore-core'
87
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
98
import toBuffer from 'it-to-buffer'
10-
import * as raw from 'multiformats/codecs/raw'
119
import { car, type Car } from '../src/index.js'
10+
import { dagWalkers } from './fixtures/dag-walkers.js'
1211
import { largeFile, smallFile } from './fixtures/files.js'
1312
import { memoryCarWriter } from './fixtures/memory-car.js'
14-
import type { DAGWalker } from '@helia/interface'
1513
import type { Blockstore } from 'interface-blockstore'
1614

17-
/**
18-
* Dag walker for dag-pb CIDs
19-
*/
20-
const dagPbWalker: DAGWalker = {
21-
codec: dagPb.code,
22-
* walk (block) {
23-
const node = dagPb.decode(block)
24-
25-
yield * node.Links.map(l => l.Hash)
26-
}
27-
}
28-
29-
const rawWalker: DAGWalker = {
30-
codec: raw.code,
31-
* walk () {
32-
// no embedded CIDs in a raw block
33-
}
34-
}
35-
36-
describe('import', () => {
15+
describe('import/export car file', () => {
3716
let blockstore: Blockstore
3817
let c: Car
3918
let u: UnixFS
40-
let dagWalkers: Record<number, DAGWalker>
4119

4220
beforeEach(async () => {
4321
blockstore = new MemoryBlockstore()
44-
dagWalkers = {
45-
[dagPb.code]: dagPbWalker,
46-
[raw.code]: rawWalker
47-
}
4822

4923
c = car({ blockstore, dagWalkers })
5024
u = unixfs({ blockstore })

packages/car/test/stream.spec.ts

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/* eslint-env mocha */
2+
3+
import { type UnixFS, unixfs } from '@helia/unixfs'
4+
import { expect } from 'aegir/chai'
5+
import { MemoryBlockstore } from 'blockstore-core'
6+
import toBuffer from 'it-to-buffer'
7+
import { car, type Car } from '../src/index.js'
8+
import { dagWalkers } from './fixtures/dag-walkers.js'
9+
import { smallFile } from './fixtures/files.js'
10+
import { memoryCarWriter } from './fixtures/memory-car.js'
11+
import type { Blockstore } from 'interface-blockstore'
12+
13+
describe('stream car file', () => {
14+
let blockstore: Blockstore
15+
let c: Car
16+
let u: UnixFS
17+
18+
beforeEach(async () => {
19+
blockstore = new MemoryBlockstore()
20+
21+
c = car({ blockstore, dagWalkers })
22+
u = unixfs({ blockstore })
23+
})
24+
25+
it('streams car file', async () => {
26+
const cid = await u.addBytes(smallFile)
27+
28+
const writer = memoryCarWriter(cid)
29+
await c.export(cid, writer)
30+
31+
const bytes = await writer.bytes()
32+
33+
const streamed = await toBuffer(c.stream(cid))
34+
35+
expect(bytes).to.equalBytes(streamed)
36+
})
37+
})

0 commit comments

Comments
 (0)