@@ -2,6 +2,7 @@ package blockstore
2
2
3
3
import (
4
4
"context"
5
+ "sync"
5
6
6
7
lru "github.com/hashicorp/golang-lru"
7
8
blocks "github.com/ipfs/go-block-format"
@@ -17,7 +18,9 @@ type cacheSize int
17
18
// size. This provides block access-time improvements, allowing
18
19
// to short-cut many searches without querying the underlying datastore.
19
20
type arccache struct {
20
- cache * lru.TwoQueueCache
21
+ cache * lru.TwoQueueCache
22
+ lks [256 ]sync.Mutex
23
+
21
24
blockstore Blockstore
22
25
viewer Viewer
23
26
@@ -33,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
33
36
if err != nil {
34
37
return nil , err
35
38
}
36
- c := & arccache {cache : cache , blockstore : bs }
39
+ c := & arccache {cache : cache , lks : [ 256 ]sync. Mutex {}, blockstore : bs }
37
40
c .hits = metrics .NewCtx (ctx , "arc.hits_total" , "Number of ARC cache hits" ).Counter ()
38
41
c .total = metrics .NewCtx (ctx , "arc_total" , "Total number of ARC cache requests" ).Counter ()
39
42
if v , ok := bs .(Viewer ); ok {
@@ -47,6 +50,9 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
47
50
return nil
48
51
}
49
52
53
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
54
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
55
+
50
56
b .cache .Remove (k ) // Invalidate cache before deleting.
51
57
err := b .blockstore .DeleteBlock (k )
52
58
if err == nil {
@@ -59,6 +65,10 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
59
65
if has , _ , ok := b .queryCache (k ); ok {
60
66
return has , nil
61
67
}
68
+
69
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
70
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
71
+
62
72
has , err := b .blockstore .Has (k )
63
73
if err != nil {
64
74
return false , err
@@ -79,6 +89,10 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
79
89
}
80
90
// we have it but don't know the size, ask the datastore.
81
91
}
92
+
93
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
94
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
95
+
82
96
blockSize , err := b .blockstore .GetSize (k )
83
97
if err == ErrNotFound {
84
98
b .cacheHave (k , false )
@@ -110,6 +124,9 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
110
124
return ErrNotFound
111
125
}
112
126
127
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
128
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
129
+
113
130
return b .viewer .View (k , callback )
114
131
}
115
132
@@ -123,6 +140,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
123
140
return nil , ErrNotFound
124
141
}
125
142
143
+ b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Lock ()
144
+ defer b .lks [k .Bytes ()[len (k .Bytes ())- 1 ]].Unlock ()
145
+
126
146
bl , err := b .blockstore .Get (k )
127
147
if bl == nil && err == ErrNotFound {
128
148
b .cacheHave (k , false )
@@ -137,6 +157,9 @@ func (b *arccache) Put(bl blocks.Block) error {
137
157
return nil
138
158
}
139
159
160
+ b .lks [bl .Cid ().Bytes ()[len (bl .Cid ().Bytes ())- 1 ]].Lock ()
161
+ defer b .lks [bl .Cid ().Bytes ()[len (bl .Cid ().Bytes ())- 1 ]].Unlock ()
162
+
140
163
err := b .blockstore .Put (bl )
141
164
if err == nil {
142
165
b .cacheSize (bl .Cid (), len (bl .RawData ()))
@@ -151,6 +174,8 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
151
174
// the block isn't in storage
152
175
if has , _ , ok := b .queryCache (block .Cid ()); ! ok || (ok && ! has ) {
153
176
good = append (good , block )
177
+ b .lks [block .Cid ().Bytes ()[len (block .Cid ().Bytes ())- 1 ]].Lock ()
178
+ defer b .lks [block .Cid ().Bytes ()[len (block .Cid ().Bytes ())- 1 ]].Unlock ()
154
179
}
155
180
}
156
181
err := b .blockstore .PutMany (good )
0 commit comments