@@ -3,6 +3,7 @@ package blockstore
3
3
import (
4
4
"context"
5
5
6
+ "github.com/gammazero/kmutex"
6
7
lru "github.com/hashicorp/golang-lru"
7
8
blocks "github.com/ipfs/go-block-format"
8
9
cid "github.com/ipfs/go-cid"
@@ -17,7 +18,9 @@ type cacheSize int
17
18
// size. This provides block access-time improvements, allowing
18
19
// to short-cut many searches without querying the underlying datastore.
19
20
type arccache struct {
20
- cache * lru.TwoQueueCache
21
+ cache * lru.TwoQueueCache
22
+ arcKMutex * kmutex.Kmutex
23
+
21
24
blockstore Blockstore
22
25
viewer Viewer
23
26
@@ -33,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
33
36
if err != nil {
34
37
return nil , err
35
38
}
36
- c := & arccache {cache : cache , blockstore : bs }
39
+ c := & arccache {cache : cache , arcKMutex : kmutex . New (), blockstore : bs }
37
40
c .hits = metrics .NewCtx (ctx , "arc.hits_total" , "Number of ARC cache hits" ).Counter ()
38
41
c .total = metrics .NewCtx (ctx , "arc_total" , "Total number of ARC cache requests" ).Counter ()
39
42
if v , ok := bs .(Viewer ); ok {
@@ -47,6 +50,9 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
47
50
return nil
48
51
}
49
52
53
+ b .arcKMutex .Lock (k )
54
+ defer b .arcKMutex .Unlock (k )
55
+
50
56
b .cache .Remove (k ) // Invalidate cache before deleting.
51
57
err := b .blockstore .DeleteBlock (k )
52
58
if err == nil {
@@ -79,6 +85,10 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
79
85
}
80
86
// we have it but don't know the size, ask the datastore.
81
87
}
88
+
89
+ b .arcKMutex .Lock (k )
90
+ defer b .arcKMutex .Unlock (k )
91
+
82
92
blockSize , err := b .blockstore .GetSize (k )
83
93
if err == ErrNotFound {
84
94
b .cacheHave (k , false )
@@ -123,6 +133,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
123
133
return nil , ErrNotFound
124
134
}
125
135
136
+ b .arcKMutex .Lock (k )
137
+ defer b .arcKMutex .Unlock (k )
138
+
126
139
bl , err := b .blockstore .Get (k )
127
140
if bl == nil && err == ErrNotFound {
128
141
b .cacheHave (k , false )
@@ -137,6 +150,9 @@ func (b *arccache) Put(bl blocks.Block) error {
137
150
return nil
138
151
}
139
152
153
+ b .arcKMutex .Lock (bl .Cid ())
154
+ defer b .arcKMutex .Unlock (bl .Cid ())
155
+
140
156
err := b .blockstore .Put (bl )
141
157
if err == nil {
142
158
b .cacheSize (bl .Cid (), len (bl .RawData ()))
@@ -151,6 +167,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
151
167
// the block isn't in storage
152
168
if has , _ , ok := b .queryCache (block .Cid ()); ! ok || (ok && ! has ) {
153
169
good = append (good , block )
170
+ b .arcKMutex .Lock (block .Cid ())
154
171
}
155
172
}
156
173
err := b .blockstore .PutMany (good )
@@ -159,6 +176,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
159
176
}
160
177
for _ , block := range good {
161
178
b .cacheSize (block .Cid (), len (block .RawData ()))
179
+ b .arcKMutex .Unlock (block .Cid ())
162
180
}
163
181
return nil
164
182
}
0 commit comments