@@ -2,6 +2,7 @@ package blockstore
2
2
3
3
import (
4
4
"context"
5
+ "sync"
5
6
6
7
lru "github.com/hashicorp/golang-lru"
7
8
blocks "github.com/ipfs/go-block-format"
@@ -16,7 +17,9 @@ type cacheSize int
16
17
// block Cids. This provides block access-time improvements, allowing
17
18
// to short-cut many searches without query-ing the underlying datastore.
18
19
type arccache struct {
19
- arc * lru.TwoQueueCache
20
+ arc * lru.TwoQueueCache
21
+ lks [256 ]sync.Mutex
22
+
20
23
blockstore Blockstore
21
24
22
25
hits metrics.Counter
@@ -28,7 +31,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
28
31
if err != nil {
29
32
return nil , err
30
33
}
31
- c := & arccache {arc : arc , blockstore : bs }
34
+ c := & arccache {arc : arc , lks : [ 256 ]sync. Mutex {}, blockstore : bs }
32
35
c .hits = metrics .NewCtx (ctx , "arc.hits_total" , "Number of ARC cache hits" ).Counter ()
33
36
c .total = metrics .NewCtx (ctx , "arc_total" , "Total number of ARC cache requests" ).Counter ()
34
37
@@ -40,6 +43,9 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
40
43
return nil
41
44
}
42
45
46
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
47
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
48
+
43
49
b .arc .Remove (k ) // Invalidate cache before deleting.
44
50
err := b .blockstore .DeleteBlock (k )
45
51
if err == nil {
@@ -76,6 +82,10 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
76
82
if has , _ , ok := b .hasCached (k ); ok {
77
83
return has , nil
78
84
}
85
+
86
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
87
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
88
+
79
89
has , err := b .blockstore .Has (k )
80
90
if err != nil {
81
91
return false , err
@@ -96,6 +106,10 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
96
106
}
97
107
// we have it but don't know the size, ask the datastore.
98
108
}
109
+
110
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
111
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
112
+
99
113
blockSize , err := b .blockstore .GetSize (k )
100
114
if err == ErrNotFound {
101
115
b .cacheHave (k , false )
@@ -115,6 +129,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
115
129
return nil , ErrNotFound
116
130
}
117
131
132
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
133
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
134
+
118
135
bl , err := b .blockstore .Get (k )
119
136
if bl == nil && err == ErrNotFound {
120
137
b .cacheHave (k , false )
@@ -129,6 +146,9 @@ func (b *arccache) Put(bl blocks.Block) error {
129
146
return nil
130
147
}
131
148
149
+ b .lks [bl .Cid ().Bytes ()[len (bl .Cid ().Bytes ())- 1 ]].Lock ()
150
+ defer b .lks [bl .Cid ().Bytes ()[len (bl .Cid ().Bytes ())- 1 ]].Unlock ()
151
+
132
152
err := b .blockstore .Put (bl )
133
153
if err == nil {
134
154
b .cacheSize (bl .Cid (), len (bl .RawData ()))
@@ -141,9 +161,13 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
141
161
for _ , block := range bs {
142
162
// call put on block if result is inconclusive or we are sure that
143
163
// the block isn't in storage
164
+
144
165
if has , _ , ok := b .hasCached (block .Cid ()); ! ok || (ok && ! has ) {
145
166
good = append (good , block )
167
+ b .lks [block .Cid ().Bytes ()[len (block .Cid ().Bytes ())- 1 ]].Lock ()
168
+ defer b .lks [block .Cid ().Bytes ()[len (block .Cid ().Bytes ())- 1 ]].Unlock ()
146
169
}
170
+
147
171
}
148
172
err := b .blockstore .PutMany (good )
149
173
if err != nil {
0 commit comments