@@ -645,11 +645,18 @@ impl DiskCacheStore {
645
645
. await ;
646
646
647
647
guard. cancel ( ) ;
648
+ drop ( guard) ;
649
+
650
+ // need take all correspond cache_key's notifiers from request_notifiers to
651
+ // prevent future cancelled
652
+ let notifiers_vec: Vec < _ > = need_fetch_block_cache_key
653
+ . iter ( )
654
+ . map ( |cache_key| self . request_notifiers . take_notifiers ( cache_key) . unwrap ( ) )
655
+ . collect ( ) ;
648
656
649
657
let fetched_bytes = match fetched_bytes {
650
658
Err ( err) => {
651
- for cache_key in & need_fetch_block_cache_key {
652
- let notifiers = self . request_notifiers . take_notifiers ( cache_key) . unwrap ( ) ;
659
+ for notifiers in notifiers_vec {
653
660
for notifier in notifiers {
654
661
if let Err ( e) = notifier. send ( Err ( Error :: WaitNotifier {
655
662
message : err. to_string ( ) ,
@@ -664,14 +671,12 @@ impl DiskCacheStore {
664
671
Ok ( v) => v,
665
672
} ;
666
673
667
- for ( bytes, cache_key) in fetched_bytes
674
+ for ( ( bytes, notifiers ) , cache_key) in fetched_bytes
668
675
. into_iter ( )
669
- . zip ( need_fetch_block_cache_key. iter ( ) )
676
+ . zip ( notifiers_vec. into_iter ( ) )
677
+ . zip ( need_fetch_block_cache_key. into_iter ( ) )
670
678
{
671
- let notifiers = self . request_notifiers . take_notifiers ( cache_key) . unwrap ( ) ;
672
- self . cache
673
- . insert_data ( cache_key. clone ( ) , bytes. clone ( ) )
674
- . await ;
679
+ self . cache . insert_data ( cache_key, bytes. clone ( ) ) . await ;
675
680
for notifier in notifiers {
676
681
if let Err ( e) = notifier. send ( Ok ( bytes. clone ( ) ) ) {
677
682
error ! ( "Failed to send notifier success result, err:{:?}." , e) ;
0 commit comments