@@ -19,7 +19,7 @@ type cacheSize int
19
19
// to short-cut many searches without querying the underlying datastore.
20
20
type arccache struct {
21
21
cache * lru.TwoQueueCache
22
- lks [256 ]sync.Mutex
22
+ lks [256 ]sync.RWMutex
23
23
24
24
blockstore Blockstore
25
25
viewer Viewer
@@ -36,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
36
36
if err != nil {
37
37
return nil , err
38
38
}
39
- c := & arccache {cache : cache , lks : [ 256 ]sync. Mutex {}, blockstore : bs }
39
+ c := & arccache {cache : cache , blockstore : bs }
40
40
c .hits = metrics .NewCtx (ctx , "arc.hits_total" , "Number of ARC cache hits" ).Counter ()
41
41
c .total = metrics .NewCtx (ctx , "arc_total" , "Total number of ARC cache requests" ).Counter ()
42
42
if v , ok := bs .(Viewer ); ok {
@@ -45,13 +45,27 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
45
45
return c , nil
46
46
}
47
47
48
+ func (b * arccache ) rLock (k cid.Cid ) func () {
49
+ b .lks [k .KeyString ()[len (k .KeyString ())- 1 ]].RLock ()
50
+ return func () {
51
+ b .lks [k .KeyString ()[len (k .KeyString ())- 1 ]].RUnlock ()
52
+ }
53
+ }
54
+
55
+ func (b * arccache ) wLock (k cid.Cid ) func () {
56
+ b .lks [k .KeyString ()[len (k .KeyString ())- 1 ]].Lock ()
57
+ return func () {
58
+ b .lks [k .KeyString ()[len (k .KeyString ())- 1 ]].Unlock ()
59
+ }
60
+ }
61
+
48
62
func (b * arccache ) DeleteBlock (k cid.Cid ) error {
49
63
if has , _ , ok := b .queryCache (k ); ok && ! has {
50
64
return nil
51
65
}
52
66
53
- b . lks [ k . Bytes ()[ len ( k . Bytes ()) - 1 ]]. Lock ( )
54
- defer b . lks [ k . Bytes ()[ len ( k . Bytes ()) - 1 ]]. Unlock ()
67
+ unlock := b . wLock ( k )
68
+ defer unlock ()
55
69
56
70
b .cache .Remove (k ) // Invalidate cache before deleting.
57
71
err := b .blockstore .DeleteBlock (k )
@@ -66,6 +80,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
66
80
return has , nil
67
81
}
68
82
83
+ unlock := b .rLock (k )
84
+ defer unlock ()
85
+
69
86
has , err := b .blockstore .Has (k )
70
87
if err != nil {
71
88
return false , err
@@ -87,8 +104,8 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
87
104
// we have it but don't know the size, ask the datastore.
88
105
}
89
106
90
- b . lks [ k . Bytes ()[ len ( k . Bytes ()) - 1 ]]. Lock ( )
91
- defer b . lks [ k . Bytes ()[ len ( k . Bytes ()) - 1 ]]. Unlock ()
107
+ unlock := b . rLock ( k )
108
+ defer unlock ()
92
109
93
110
blockSize , err := b .blockstore .GetSize (k )
94
111
if err == ErrNotFound {
@@ -121,6 +138,9 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
121
138
return ErrNotFound
122
139
}
123
140
141
+ unlock := b .rLock (k )
142
+ defer unlock ()
143
+
124
144
return b .viewer .View (k , callback )
125
145
}
126
146
@@ -134,8 +154,8 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
134
154
return nil , ErrNotFound
135
155
}
136
156
137
- b . lks [ k . Bytes ()[ len ( k . Bytes ()) - 1 ]]. Lock ( )
138
- defer b . lks [ k . Bytes ()[ len ( k . Bytes ()) - 1 ]]. Unlock ()
157
+ unlock := b . rLock ( k )
158
+ defer unlock ()
139
159
140
160
bl , err := b .blockstore .Get (k )
141
161
if bl == nil && err == ErrNotFound {
@@ -151,8 +171,8 @@ func (b *arccache) Put(bl blocks.Block) error {
151
171
return nil
152
172
}
153
173
154
- b . lks [ bl . Cid (). Bytes ()[ len ( bl .Cid (). Bytes ()) - 1 ]]. Lock ( )
155
- defer b . lks [ bl . Cid (). Bytes ()[ len ( bl . Cid (). Bytes ()) - 1 ]]. Unlock ()
174
+ unlock := b . wLock ( bl .Cid ())
175
+ defer unlock ()
156
176
157
177
err := b .blockstore .Put (bl )
158
178
if err == nil {
@@ -162,28 +182,30 @@ func (b *arccache) Put(bl blocks.Block) error {
162
182
}
163
183
164
184
func (b * arccache ) PutMany (bs []blocks.Block ) error {
185
+ mxs := [256 ]* sync.Mutex {}
165
186
var good []blocks.Block
166
187
for _ , block := range bs {
167
188
// call put on block if result is inconclusive or we are sure that
168
189
// the block isn't in storage
169
190
if has , _ , ok := b .queryCache (block .Cid ()); ! ok || (ok && ! has ) {
170
191
good = append (good , block )
171
- b . lks [block .Cid ().Bytes ()[len (block .Cid ().Bytes ())- 1 ]]. Lock ()
192
+ mxs [block .Cid ().KeyString ()[len (block .Cid ().KeyString ())- 1 ]] = & sync. Mutex {}
172
193
}
173
194
}
174
195
175
- defer func () {
176
- for _ , block := range good {
177
- b . lks [ block . Cid (). Bytes ()[ len ( block . Cid (). Bytes ()) - 1 ]]. Unlock ()
196
+ for _ , mx := range mxs {
197
+ if mx != nil {
198
+ mx . Lock ()
178
199
}
179
- }()
200
+ }
180
201
181
202
err := b .blockstore .PutMany (good )
182
203
if err != nil {
183
204
return err
184
205
}
185
206
for _ , block := range good {
186
207
b .cacheSize (block .Cid (), len (block .RawData ()))
208
+ mxs [block .Cid ().KeyString ()[len (block .Cid ().KeyString ())- 1 ]].Unlock ()
187
209
}
188
210
return nil
189
211
}
0 commit comments