Skip to content

Commit 52d4786

Browse files
authored
fix: simplify shard handling (#5)
Reduces duplication between adding and removing links to sharded directories by making them use the same code to trace a path to the operation target without loading the entire shard.
1 parent 3f55208 commit 52d4786

16 files changed

+545
-588
lines changed

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@
139139
"release": "aegir release"
140140
},
141141
"dependencies": {
142-
"@helia/interface": "next",
143142
"@ipld/dag-pb": "^4.0.0",
144143
"@libp2p/interfaces": "^3.3.1",
145144
"@libp2p/logger": "^2.0.5",
@@ -152,7 +151,8 @@
152151
"it-last": "^2.0.0",
153152
"it-pipe": "^2.0.5",
154153
"merge-options": "^3.0.4",
155-
"multiformats": "^11.0.1"
154+
"multiformats": "^11.0.1",
155+
"sparse-array": "^1.3.2"
156156
},
157157
"devDependencies": {
158158
"aegir": "^38.1.0",

src/commands/utils/add-link.ts

+107-170
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,31 @@ import * as dagPB from '@ipld/dag-pb'
22
import { CID, Version } from 'multiformats/cid'
33
import { logger } from '@libp2p/logger'
44
import { UnixFS } from 'ipfs-unixfs'
5-
import { DirSharded } from './dir-sharded.js'
65
import {
7-
updateHamtDirectory,
8-
recreateHamtLevel,
9-
recreateInitialHamtLevel,
106
createShard,
7+
recreateShardedDirectory,
118
toPrefix,
12-
addLinksToHamtBucket
9+
updateShardedDirectory
1310
} from './hamt-utils.js'
14-
import last from 'it-last'
1511
import type { PBNode, PBLink } from '@ipld/dag-pb/interface'
1612
import { sha256 } from 'multiformats/hashes/sha2'
17-
import type { Bucket } from 'hamt-sharding'
1813
import { AlreadyExistsError, InvalidParametersError, InvalidPBNodeError } from './errors.js'
1914
import type { ImportResult } from 'ipfs-unixfs-importer'
2015
import type { AbortOptions } from '@libp2p/interfaces'
2116
import type { Directory } from './cid-to-directory.js'
2217
import type { Blockstore } from 'interface-blockstore'
2318
import { isOverShardThreshold } from './is-over-shard-threshold.js'
19+
import { hamtBucketBits, hamtHashFn } from './hamt-constants.js'
20+
// @ts-expect-error no types
21+
import SparseArray from 'sparse-array'
22+
import { wrapHash } from './consumable-hash.js'
23+
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
2424

2525
const log = logger('helia:unixfs:components:utils:add-link')
2626

2727
export interface AddLinkResult {
2828
node: PBNode
2929
cid: CID
30-
size: number
3130
}
3231

3332
export interface AddLinkOptions extends AbortOptions {
@@ -81,7 +80,7 @@ const convertToShardedDirectory = async (parent: Directory, blockstore: Blocksto
8180
cidVersion: parent.cid.version
8281
})
8382

84-
log(`Converted directory to sharded directory ${result.cid}`)
83+
log(`converted directory to sharded directory ${result.cid}`)
8584

8685
return result
8786
}
@@ -134,187 +133,125 @@ const addToDirectory = async (parent: Directory, child: PBLink, blockstore: Bloc
134133

135134
return {
136135
node: parent.node,
137-
cid,
138-
size: buf.length
136+
cid
139137
}
140138
}
141139

142140
const addToShardedDirectory = async (parent: Directory, child: Required<PBLink>, blockstore: Blockstore, options: AddLinkOptions): Promise<AddLinkResult> => {
143-
const {
144-
shard, path
145-
} = await addFileToShardedDirectory(parent, child, blockstore, options)
146-
const result = await last(shard.flush(blockstore))
141+
const { path, hash } = await recreateShardedDirectory(parent.cid, child.Name, blockstore, options)
142+
const finalSegment = path[path.length - 1]
147143

148-
if (result == null) {
149-
throw new Error('No result from flushing shard')
144+
if (finalSegment == null) {
145+
throw new Error('Invalid HAMT, could not generate path')
150146
}
151147

152-
const block = await blockstore.get(result.cid)
153-
const node = dagPB.decode(block)
148+
// find the next prefix
149+
// const index = await hash.take(hamtBucketBits)
150+
const prefix = finalSegment.prefix
151+
const index = parseInt(prefix, 16)
154152

155-
// we have written out the shard, but only one sub-shard will have been written so replace it in the original shard
156-
const parentLinks = parent.node.Links.filter((link) => {
157-
return (link.Name ?? '').substring(0, 2) !== path[0].prefix
158-
})
159-
160-
const newLink = node.Links
161-
.find(link => (link.Name ?? '').substring(0, 2) === path[0].prefix)
162-
163-
if (newLink == null) {
164-
throw new Error(`No link found with prefix ${path[0].prefix}`)
165-
}
166-
167-
parentLinks.push(newLink)
168-
169-
return await updateHamtDirectory({
170-
Data: parent.node.Data,
171-
Links: parentLinks
172-
}, blockstore, path[0].bucket, options)
173-
}
153+
log('next prefix for %s is %s', child.Name, prefix)
174154

175-
const addFileToShardedDirectory = async (parent: Directory, child: Required<PBLink>, blockstore: Blockstore, options: AddLinkOptions): Promise<{ shard: DirSharded, path: BucketPath[] }> => {
176-
if (parent.node.Data == null) {
177-
throw new InvalidPBNodeError('Parent node with no data passed to addFileToShardedDirectory')
178-
}
155+
const linkName = `${prefix}${child.Name}`
156+
const existingLink = finalSegment.node.Links.find(l => (l.Name ?? '').startsWith(prefix))
179157

180-
// start at the root bucket and descend, loading nodes as we go
181-
const rootBucket = await recreateInitialHamtLevel(parent.node.Links)
182-
const node = UnixFS.unmarshal(parent.node.Data)
183-
184-
const shard = new DirSharded({
185-
root: true,
186-
dir: true,
187-
parent: undefined,
188-
parentKey: undefined,
189-
path: '',
190-
dirty: true,
191-
flat: false,
192-
mode: node.mode
193-
}, {
194-
...options,
195-
cidVersion: parent.cid.version
196-
})
197-
shard._bucket = rootBucket
198-
199-
if (node.mtime != null) {
200-
// update mtime if previously set
201-
shard.mtime = {
202-
secs: BigInt(Math.round(Date.now() / 1000))
203-
}
204-
}
205-
206-
// load subshards until the bucket & position no longer changes
207-
const position = await rootBucket._findNewBucketAndPos(child.Name)
208-
const path = toBucketPath(position)
209-
path[0].node = parent.node
210-
let index = 0
211-
212-
while (index < path.length) {
213-
const segment = path[index]
214-
index++
215-
const node = segment.node
216-
217-
if (node == null) {
218-
throw new Error('Segment had no node')
219-
}
220-
221-
const link = node.Links
222-
.find(link => (link.Name ?? '').substring(0, 2) === segment.prefix)
223-
224-
if (link == null) {
225-
// prefix is new, file will be added to the current bucket
226-
log(`Link ${segment.prefix}${child.Name} will be added`)
227-
index = path.length
228-
229-
break
230-
}
158+
if (existingLink != null) {
159+
log('link %s was present in shard', linkName)
160+
// link is already present in shard
231161

232-
if (link.Name === `${segment.prefix}${child.Name}`) {
162+
if (existingLink.Name === linkName) {
163+
// file with same name is already present in shard
233164
if (!options.allowOverwriting) {
234165
throw new AlreadyExistsError()
235166
}
236167

237-
// file already existed, file will be added to the current bucket
238-
log(`Link ${segment.prefix}${child.Name} will be replaced`)
239-
index = path.length
240-
241-
break
242-
}
243-
244-
if ((link.Name ?? '').length > 2) {
245-
// another file had the same prefix, will be replaced with a subshard
246-
log(`Link ${link.Name} ${link.Hash} will be replaced with a subshard`)
247-
index = path.length
248-
249-
break
250-
}
251-
252-
// load sub-shard
253-
log(`Found subshard ${segment.prefix}`)
254-
const block = await blockstore.get(link.Hash)
255-
const subShard = dagPB.decode(block)
256-
257-
// subshard hasn't been loaded, descend to the next level of the HAMT
258-
if (path[index] == null) {
259-
log(`Loaded new subshard ${segment.prefix}`)
260-
await recreateHamtLevel(blockstore, subShard.Links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), options)
261-
262-
const position = await rootBucket._findNewBucketAndPos(child.Name)
263-
264-
path.push({
265-
bucket: position.bucket,
266-
prefix: toPrefix(position.pos),
267-
node: subShard
168+
log('overwriting %s in subshard', child.Name)
169+
finalSegment.node.Links = finalSegment.node.Links.filter(l => l.Name !== linkName)
170+
finalSegment.node.Links.push({
171+
Name: linkName,
172+
Hash: child.Hash,
173+
Tsize: child.Tsize
268174
})
175+
} else if (existingLink.Name?.length === 2) {
176+
throw new Error('Existing link was subshard?!')
177+
} else {
178+
// conflict, add a new HAMT segment
179+
log('prefix %s already exists, creating new subshard', prefix)
180+
// find the sibling we are going to replace
181+
const index = finalSegment.node.Links.findIndex(l => l.Name?.startsWith(prefix))
182+
const sibling = finalSegment.node.Links.splice(index, 1)[0]
183+
184+
// give the sibling a new HAMT prefix
185+
const siblingName = (sibling.Name ?? '').substring(2)
186+
const wrapped = wrapHash(hamtHashFn)
187+
const siblingHash = wrapped(uint8ArrayFromString(siblingName))
188+
189+
// discard hash bits until we reach the subshard depth
190+
for (let i = 0; i < path.length; i++) {
191+
await siblingHash.take(hamtBucketBits)
192+
}
269193

270-
break
194+
while (true) {
195+
const siblingIndex = await siblingHash.take(hamtBucketBits)
196+
const siblingPrefix = toPrefix(siblingIndex)
197+
sibling.Name = `${siblingPrefix}${siblingName}`
198+
199+
// calculate the target file's HAMT prefix in the new sub-shard
200+
const newIndex = await hash.take(hamtBucketBits)
201+
const newPrefix = toPrefix(newIndex)
202+
203+
if (siblingPrefix === newPrefix) {
204+
// the two sibling names have caused another conflict - add an intermediate node to
205+
// the HAMT and try again
206+
207+
// create the child locations
208+
const children = new SparseArray()
209+
children.set(newIndex, true)
210+
211+
path.push({
212+
prefix: newPrefix,
213+
children,
214+
node: {
215+
Links: []
216+
}
217+
})
218+
219+
continue
220+
}
221+
222+
// create the child locations
223+
const children = new SparseArray()
224+
children.set(newIndex, true)
225+
children.set(siblingIndex, true)
226+
227+
// add our new segment
228+
path.push({
229+
prefix,
230+
children,
231+
node: {
232+
Links: [
233+
sibling, {
234+
Name: `${newPrefix}${child.Name}`,
235+
Hash: child.Hash,
236+
Tsize: child.Tsize
237+
}
238+
]
239+
}
240+
})
241+
242+
break
243+
}
271244
}
245+
} else {
246+
log('link %s was not present in sub-shard', linkName)
272247

273-
const nextSegment = path[index]
274-
275-
// add next levels worth of links to bucket
276-
await addLinksToHamtBucket(blockstore, subShard.Links, nextSegment.bucket, rootBucket, options)
277-
278-
nextSegment.node = subShard
279-
}
280-
281-
// finally add the new file into the shard
282-
await shard._bucket.put(child.Name, {
283-
size: BigInt(child.Tsize),
284-
cid: child.Hash
285-
})
286-
287-
return {
288-
shard, path
289-
}
290-
}
291-
292-
export interface BucketPath {
293-
bucket: Bucket<any>
294-
prefix: string
295-
node?: PBNode
296-
}
297-
298-
const toBucketPath = (position: { pos: number, bucket: Bucket<any> }): BucketPath[] => {
299-
const path = [{
300-
bucket: position.bucket,
301-
prefix: toPrefix(position.pos)
302-
}]
303-
304-
let bucket = position.bucket._parent
305-
let positionInBucket = position.bucket._posAtParent
306-
307-
while (bucket != null) {
308-
path.push({
309-
bucket,
310-
prefix: toPrefix(positionInBucket)
311-
})
248+
// add new link to shard
249+
child.Name = linkName
250+
finalSegment.node.Links.push(child)
251+
finalSegment.children.set(index, true)
312252

313-
positionInBucket = bucket._posAtParent
314-
bucket = bucket._parent
253+
log('adding %s to existing sub-shard', linkName)
315254
}
316255

317-
path.reverse()
318-
319-
return path
256+
return await updateShardedDirectory(path, blockstore, options)
320257
}

0 commit comments

Comments
 (0)