Skip to content

Commit d97e634

Browse files
distractedm1ndrenaynayWondertan
authored
fix(share/eds): use cached accessors in GetCAR and GetDAH (#2000)
Closes #1514 . This PR enables accessor cache usage for GetCAR and GetDAH. This will allow shrexeds and shrexnd servers to only require opening the underlying file once (until removed from cache). If many peers request the EDS at once, the server previously needed to open the file each time. Will still require a dependency bump of our dagstore fork for the new test to not fail - so draft until then. Related: celestiaorg/dagstore#2 --------- Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
1 parent e14995e commit d97e634

File tree

6 files changed

+71
-21
lines changed

6 files changed

+71
-21
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ require (
327327

328328
replace (
329329
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7
330-
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659
330+
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136
331331
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
332332
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23
333333
)

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 h1:BHvn41IHOtvHeX1VZqO/
206206
github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23/go.mod h1:nL+vkAMKy/A8wWemWqMwBy4pOGWYYbboAVTEe3N5gIU=
207207
github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 h1:EADZy33ufskVIy6Rj6jbi3SOVCeYYo26zUi7iYx+QR0=
208208
github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7/go.mod h1:vg3Eza9adJJ5Mdx6boz5MpZsZcTZyrfTVYZHyi2zLm4=
209-
github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659 h1:f3205vw3GYBtMiNoS+qB6IuHSs50Iwqsm9lNIikLTCk=
210-
github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM=
209+
github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136 h1:LBvY3NDA18fcS72pBAEd2pENoUpz1iV4cCXBN2Zrj/I=
210+
github.com/celestiaorg/dagstore v0.0.0-20230404123415-177451f83136/go.mod h1:ta/DlqIH10bvhwqJIw51Nq3QU4XVMp6pz3f0Deve9fM=
211211
github.com/celestiaorg/go-header v0.2.3 h1:41r60OtAeexWC3J3eTELgWfzcdKR2taFlfcJ/2IHZD4=
212212
github.com/celestiaorg/go-header v0.2.3/go.mod h1:6XKf0yhoEQqfKQTZnyTZjTjF5jH5Wq9uO9AvDMkdYbs=
213213
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=

share/eds/accessor_cache.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock
6363
lk.Lock()
6464
defer lk.Unlock()
6565

66+
return bc.unsafeGet(shardContainingCid)
67+
}
68+
69+
func (bc *blockstoreCache) unsafeGet(shardContainingCid shard.Key) (*accessorWithBlockstore, error) {
6670
// We've already ensured that the given shard has the cid/multihash we are looking for.
6771
val, ok := bc.cache.Get(shardContainingCid)
6872
if !ok {
@@ -83,16 +87,23 @@ func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlock
8387
func (bc *blockstoreCache) Add(
8488
shardContainingCid shard.Key,
8589
accessor *dagstore.ShardAccessor,
90+
) (*accessorWithBlockstore, error) {
91+
lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)]
92+
lk.Lock()
93+
defer lk.Unlock()
94+
95+
return bc.unsafeAdd(shardContainingCid, accessor)
96+
}
97+
98+
func (bc *blockstoreCache) unsafeAdd(
99+
shardContainingCid shard.Key,
100+
accessor *dagstore.ShardAccessor,
86101
) (*accessorWithBlockstore, error) {
87102
blockStore, err := accessor.Blockstore()
88103
if err != nil {
89104
return nil, fmt.Errorf("failed to get blockstore from accessor: %w", err)
90105
}
91106

92-
lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)]
93-
lk.Lock()
94-
defer lk.Unlock()
95-
96107
newAccessor := &accessorWithBlockstore{
97108
bs: blockStore,
98109
sa: accessor,

share/eds/store.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -204,17 +204,18 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
204204
// The Reader strictly reads the CAR header and first quadrant (1/4) of the EDS, omitting all the
205205
// NMT Merkle proofs. Integrity of the store data is not verified.
206206
//
207-
// Caller must Close returned reader after reading.
208-
func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) {
207+
// The shard is cached in the Store, so subsequent calls to GetCAR with the same root will use the same reader.
208+
// The cache is responsible for closing the underlying reader.
209+
func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, error) {
209210
ctx, span := tracer.Start(ctx, "store/get-car", trace.WithAttributes(attribute.String("root", root.String())))
210211
defer span.End()
211212

212213
key := root.String()
213-
accessor, err := s.getAccessor(ctx, shard.KeyFromString(key))
214+
accessor, err := s.getCachedAccessor(ctx, shard.KeyFromString(key))
214215
if err != nil {
215216
return nil, fmt.Errorf("failed to get accessor: %w", err)
216217
}
217-
return accessor, nil
218+
return accessor.sa.Reader(), nil
218219
}
219220

220221
// Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS
@@ -247,13 +248,12 @@ func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, e
247248
defer span.End()
248249

249250
key := shard.KeyFromString(root.String())
250-
accessor, err := s.getAccessor(ctx, key)
251+
accessor, err := s.getCachedAccessor(ctx, key)
251252
if err != nil {
252253
return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err)
253254
}
254-
defer accessor.Close()
255255

256-
carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor))
256+
carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor.sa.Reader()))
257257
if err != nil {
258258
return nil, fmt.Errorf("eds/store: failed to read car header: %w", err)
259259
}
@@ -297,8 +297,11 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.Shard
297297
}
298298

299299
func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) {
300-
// try to fetch from cache
301-
accessor, err := s.cache.Get(key)
300+
lk := &s.cache.stripedLocks[shardKeyToStriped(key)]
301+
lk.Lock()
302+
defer lk.Unlock()
303+
304+
accessor, err := s.cache.unsafeGet(key)
302305
if err != nil && err != errCacheMiss {
303306
log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err)
304307
}
@@ -311,7 +314,7 @@ func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessor
311314
if err != nil {
312315
return nil, err
313316
}
314-
return s.cache.Add(key, shardAccessor)
317+
return s.cache.unsafeAdd(key, shardAccessor)
315318
}
316319

317320
// Remove removes EDS from Store by the given share.Root hash and cleans up all
@@ -367,7 +370,6 @@ func (s *Store) Get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten
367370
if err != nil {
368371
return nil, fmt.Errorf("failed to get CAR file: %w", err)
369372
}
370-
defer f.Close()
371373
eds, err = ReadEDS(ctx, f, root)
372374
if err != nil {
373375
return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err)

share/eds/store_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,43 @@ func Test_BlockstoreCache(t *testing.T) {
206206
assert.NoError(t, err, errCacheMiss)
207207
}
208208

209+
// Test_CachedAccessor verifies that the reader represented by a cached accessor can be read from
210+
// multiple times, without exhausting the underlying reader.
211+
func Test_CachedAccessor(t *testing.T) {
212+
ctx, cancel := context.WithCancel(context.Background())
213+
t.Cleanup(cancel)
214+
215+
edsStore, err := newStore(t)
216+
require.NoError(t, err)
217+
err = edsStore.Start(ctx)
218+
require.NoError(t, err)
219+
220+
eds, dah := randomEDS(t)
221+
err = edsStore.Put(ctx, dah.Hash(), eds)
222+
require.NoError(t, err)
223+
224+
shardKey := shard.KeyFromString(dah.String())
225+
// adds to cache
226+
cachedAccessor, err := edsStore.getCachedAccessor(ctx, shardKey)
227+
assert.NoError(t, err)
228+
229+
// first read
230+
carReader, err := car.NewCarReader(cachedAccessor.sa.Reader())
231+
assert.NoError(t, err)
232+
firstBlock, err := carReader.Next()
233+
assert.NoError(t, err)
234+
235+
// second read
236+
cachedAccessor, err = edsStore.getCachedAccessor(ctx, shardKey)
237+
assert.NoError(t, err)
238+
carReader, err = car.NewCarReader(cachedAccessor.sa.Reader())
239+
assert.NoError(t, err)
240+
secondBlock, err := carReader.Next()
241+
assert.NoError(t, err)
242+
243+
assert.Equal(t, firstBlock, secondBlock)
244+
}
245+
209246
func newStore(t *testing.T) (*Store, error) {
210247
t.Helper()
211248

share/p2p/shrexeds/server.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ func (s *Server) handleStream(stream network.Stream) {
8080
defer cancel()
8181
status := p2p_pb.Status_OK
8282
// determine whether the EDS is available in our store
83+
// we do not close the reader, so that other requests will not need to re-open the file.
84+
// closing is handled by the LRU cache.
8385
edsReader, err := s.store.GetCAR(ctx, hash)
8486
if err != nil {
8587
status = p2p_pb.Status_NOT_FOUND
86-
} else {
87-
defer edsReader.Close()
8888
}
8989

9090
// inform the client of our status
@@ -144,7 +144,7 @@ func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error
144144
return err
145145
}
146146

147-
func (s *Server) writeODS(edsReader io.ReadCloser, stream network.Stream) error {
147+
func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error {
148148
err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout))
149149
if err != nil {
150150
log.Debug(err)

0 commit comments

Comments
 (0)