@@ -2,6 +2,7 @@ package blockstore
2
2
3
3
import (
4
4
"context"
5
+ "sync"
5
6
6
7
lru "github.com/hashicorp/golang-lru"
7
8
blocks "github.com/ipfs/go-block-format"
@@ -16,7 +17,10 @@ type cacheSize int
16
17
// block Cids. This provides block access-time improvements, allowing
17
18
// to short-cut many searches without query-ing the underlying datastore.
18
19
type arccache struct {
19
- arc * lru.TwoQueueCache
20
+ arc * lru.TwoQueueCache
21
+
22
+ arcLks sync.Map
23
+
20
24
blockstore Blockstore
21
25
22
26
hits metrics.Counter
@@ -28,28 +32,72 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
28
32
if err != nil {
29
33
return nil , err
30
34
}
31
- c := & arccache {arc : arc , blockstore : bs }
35
+ c := & arccache {arc : arc , arcLks : sync. Map {}, blockstore : bs }
32
36
c .hits = metrics .NewCtx (ctx , "arc.hits_total" , "Number of ARC cache hits" ).Counter ()
33
37
c .total = metrics .NewCtx (ctx , "arc_total" , "Total number of ARC cache requests" ).Counter ()
34
38
35
39
return c , nil
36
40
}
37
41
38
- func (b * arccache ) DeleteBlock (k cid.Cid ) error {
39
- if has , _ , ok := b .hasCached (k ); ok && ! has {
40
- return nil
42
+ // if ok == false has is inconclusive, calling release is a noop
43
+ // if ok == true then has response to question: is it contained and a lock will be held on content `k` until `release` is called.
44
+ // it is the callers responsibility to call `release` when they are done writing/reading data.
45
+ func (b * arccache ) hasCachedSync (k cid.Cid ) (has bool , size int , ok bool , release func ()) {
46
+ // default return ops
47
+ has = false
48
+ size = - 1
49
+ ok = false
50
+ release = func () {}
51
+
52
+ b .total .Inc ()
53
+ if ! k .Defined () {
54
+ log .Error ("undefined cid in arccache" )
55
+ // Return cache invalid so the call to blockstore happens
56
+ // in case of invalid key and correct error is created.
57
+ return
41
58
}
42
59
43
- b .arc .Remove (k ) // Invalidate cache before deleting.
44
- err := b .blockstore .DeleteBlock (k )
45
- if err == nil {
46
- b .cacheHave (k , false )
60
+ h , ok := b .arc .Get (string (k .Hash ()))
61
+ if ok {
62
+ b .hits .Inc ()
63
+ switch h := h .(type ) {
64
+ case cacheHave :
65
+ has = bool (h )
66
+ size = - 1
67
+ ok = true
68
+ case cacheSize :
69
+ has = true
70
+ size = int (h )
71
+ ok = true
72
+ }
47
73
}
48
- return err
74
+
75
+ // check if we have a lock for this content.
76
+ v , hasLk := b .arcLks .Load (k )
77
+ if has && hasLk {
78
+ // cache and lock hit.
79
+ lk := v .(* sync.Mutex )
80
+ lk .Lock ()
81
+ release = func () { lk .Unlock () }
82
+
83
+ return
84
+ } else if has && ! hasLk {
85
+ // cache hit and lock miss, create the lock, lock it, and add it to the lockMap
86
+ lk := new (sync.Mutex )
87
+ b .arcLks .Store (k , lk )
88
+
89
+ lk .Lock ()
90
+ release = func () { lk .Unlock () }
91
+ } else if ! has && hasLk {
92
+ // cache miss and lock hit, remove lock from map
93
+ b .arcLks .Delete (k )
94
+ }
95
+ // else cache miss and lock miss, noop
96
+ return
49
97
}
50
98
51
99
// if ok == false has is inconclusive
52
- // if ok == true then has respons to question: is it contained
100
+ // if ok == true then has response to question: is it contained
53
101
func (b * arccache ) hasCached (k cid.Cid ) (has bool , size int , ok bool ) {
54
102
b .total .Inc ()
55
103
if ! k .Defined () {
@@ -72,6 +120,21 @@ func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
72
120
return false , - 1 , false
73
121
}
74
122
123
+ func (b * arccache ) DeleteBlock (k cid.Cid ) error {
124
+ has , _ , ok , release := b .hasCachedSync (k )
125
+ defer release ()
126
+ if ok && ! has {
127
+ return nil
128
+ }
129
+
130
+ b .arc .Remove (k ) // Invalidate cache before deleting.
131
+ err := b .blockstore .DeleteBlock (k )
132
+ if err == nil {
133
+ b .cacheHave (k , false )
134
+ }
135
+ return err
136
+ }
137
+
75
138
func (b * arccache ) Has (k cid.Cid ) (bool , error ) {
76
139
if has , _ , ok := b .hasCached (k ); ok {
77
140
return has , nil
@@ -85,7 +148,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
85
148
}
86
149
87
150
func (b * arccache ) GetSize (k cid.Cid ) (int , error ) {
88
- if has , blockSize , ok := b .hasCached (k ); ok {
151
+ has , blockSize , ok , release := b .hasCachedSync (k )
152
+ defer release ()
153
+ if ok {
89
154
if ! has {
90
155
// don't have it, return
91
156
return - 1 , ErrNotFound
@@ -111,7 +176,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
111
176
return nil , ErrNotFound
112
177
}
113
178
114
- if has , _ , ok := b .hasCached (k ); ok && ! has {
179
+ has , _ , ok , release := b .hasCachedSync (k )
180
+ defer release ()
181
+ if ok && ! has {
115
182
return nil , ErrNotFound
116
183
}
117
184
@@ -125,7 +192,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
125
192
}
126
193
127
194
func (b * arccache ) Put (bl blocks.Block ) error {
128
- if has , _ , ok := b .hasCached (bl .Cid ()); ok && has {
195
+ has , _ , ok , release := b .hasCachedSync (bl .Cid ())
196
+ defer release ()
197
+ if ok && has {
129
198
return nil
130
199
}
131
200
@@ -141,9 +210,11 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
141
210
for _ , block := range bs {
142
211
// call put on block if result is inconclusive or we are sure that
143
212
// the block isn't in storage
144
- if has , _ , ok := b .hasCached (block .Cid ()); ! ok || (ok && ! has ) {
213
+ has , _ , ok , release := b .hasCachedSync (block .Cid ())
214
+ if ! ok || (ok && ! has ) {
145
215
good = append (good , block )
146
216
}
217
+ defer release ()
147
218
}
148
219
err := b .blockstore .PutMany (good )
149
220
if err != nil {
0 commit comments