@@ -2,6 +2,7 @@ package blockstore
2
2
3
3
import (
4
4
"context"
5
+ "sync"
5
6
6
7
lru "github.com/hashicorp/golang-lru"
7
8
blocks "github.com/ipfs/go-block-format"
@@ -17,7 +18,9 @@ type cacheSize int
17
18
// size. This provides block access-time improvements, allowing
18
19
// to short-cut many searches without querying the underlying datastore.
19
20
type arccache struct {
20
- cache * lru.TwoQueueCache
21
+ cache * lru.TwoQueueCache
22
+ lks [256 ]sync.RWMutex
23
+
21
24
blockstore Blockstore
22
25
viewer Viewer
23
26
@@ -42,11 +45,27 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
42
45
return c , nil
43
46
}
44
47
48
+ func (b * arccache ) getLock (k cid.Cid ) * sync.RWMutex {
49
+ return & b .lks [mutexKey (k )]
50
+ }
51
+
52
+ func mutexKey (k cid.Cid ) uint8 {
53
+ return k .KeyString ()[len (k .KeyString ())- 1 ]
54
+ }
55
+
45
56
func (b * arccache ) DeleteBlock (k cid.Cid ) error {
57
+ if ! k .Defined () {
58
+ return nil
59
+ }
60
+
46
61
if has , _ , ok := b .queryCache (k ); ok && ! has {
47
62
return nil
48
63
}
49
64
65
+ lk := b .getLock (k )
66
+ lk .Lock ()
67
+ defer lk .Unlock ()
68
+
50
69
b .cache .Remove (k ) // Invalidate cache before deleting.
51
70
err := b .blockstore .DeleteBlock (k )
52
71
if err == nil {
@@ -56,9 +75,18 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
56
75
}
57
76
58
77
func (b * arccache ) Has (k cid.Cid ) (bool , error ) {
78
+ if ! k .Defined () {
79
+ return false , nil
80
+ }
81
+
59
82
if has , _ , ok := b .queryCache (k ); ok {
60
83
return has , nil
61
84
}
85
+
86
+ lk := b .getLock (k )
87
+ lk .RLock ()
88
+ defer lk .RUnlock ()
89
+
62
90
has , err := b .blockstore .Has (k )
63
91
if err != nil {
64
92
return false , err
@@ -68,6 +96,10 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
68
96
}
69
97
70
98
func (b * arccache ) GetSize (k cid.Cid ) (int , error ) {
99
+ if ! k .Defined () {
100
+ return - 1 , ErrNotFound
101
+ }
102
+
71
103
if has , blockSize , ok := b .queryCache (k ); ok {
72
104
if ! has {
73
105
// don't have it, return
@@ -79,6 +111,11 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
79
111
}
80
112
// we have it but don't know the size, ask the datastore.
81
113
}
114
+
115
+ lk := b .getLock (k )
116
+ lk .RLock ()
117
+ defer lk .RUnlock ()
118
+
82
119
blockSize , err := b .blockstore .GetSize (k )
83
120
if err == ErrNotFound {
84
121
b .cacheHave (k , false )
@@ -100,7 +137,6 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
100
137
}
101
138
102
139
if ! k .Defined () {
103
- log .Error ("undefined cid in arc cache" )
104
140
return ErrNotFound
105
141
}
106
142
@@ -110,19 +146,26 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
110
146
return ErrNotFound
111
147
}
112
148
149
+ lk := b .getLock (k )
150
+ lk .RLock ()
151
+ defer lk .RUnlock ()
152
+
113
153
return b .viewer .View (k , callback )
114
154
}
115
155
116
156
func (b * arccache ) Get (k cid.Cid ) (blocks.Block , error ) {
117
157
if ! k .Defined () {
118
- log .Error ("undefined cid in arc cache" )
119
158
return nil , ErrNotFound
120
159
}
121
160
122
161
if has , _ , ok := b .queryCache (k ); ok && ! has {
123
162
return nil , ErrNotFound
124
163
}
125
164
165
+ lk := b .getLock (k )
166
+ lk .RLock ()
167
+ defer lk .RUnlock ()
168
+
126
169
bl , err := b .blockstore .Get (k )
127
170
if bl == nil && err == ErrNotFound {
128
171
b .cacheHave (k , false )
@@ -137,6 +180,10 @@ func (b *arccache) Put(bl blocks.Block) error {
137
180
return nil
138
181
}
139
182
183
+ lk := b .getLock (bl .Cid ())
184
+ lk .Lock ()
185
+ defer lk .Unlock ()
186
+
140
187
err := b .blockstore .Put (bl )
141
188
if err == nil {
142
189
b .cacheSize (bl .Cid (), len (bl .RawData ()))
@@ -145,14 +192,31 @@ func (b *arccache) Put(bl blocks.Block) error {
145
192
}
146
193
147
194
func (b * arccache ) PutMany (bs []blocks.Block ) error {
195
+ mxs := [256 ]* sync.RWMutex {}
148
196
var good []blocks.Block
149
197
for _ , block := range bs {
150
198
// call put on block if result is inconclusive or we are sure that
151
199
// the block isn't in storage
152
200
if has , _ , ok := b .queryCache (block .Cid ()); ! ok || (ok && ! has ) {
153
201
good = append (good , block )
202
+ mxs [mutexKey (block .Cid ())] = & b .lks [mutexKey (block .Cid ())]
203
+ }
204
+ }
205
+
206
+ for _ , mx := range mxs {
207
+ if mx != nil {
208
+ mx .Lock ()
154
209
}
155
210
}
211
+
212
+ defer func () {
213
+ for _ , mx := range mxs {
214
+ if mx != nil {
215
+ mx .Unlock ()
216
+ }
217
+ }
218
+ }()
219
+
156
220
err := b .blockstore .PutMany (good )
157
221
if err != nil {
158
222
return err
0 commit comments