@@ -96,24 +96,9 @@ boolean isEmpty()
96
96
*/
97
97
boolean shouldRefreshUsedSegment (SegmentId segmentId , @ Nullable DateTime persistedUpdateTime )
98
98
{
99
- return withReadLock (() -> {
100
- final DataSegmentPlus cachedState = readSegmentsFor (segmentId .getInterval ())
101
- .idToUsedSegment .get (segmentId );
102
- return cachedState == null
103
- || shouldUpdateCache (cachedState .getUsedStatusLastUpdatedDate (), persistedUpdateTime );
104
- });
105
- }
106
-
107
- /**
108
- * Checks if a pending segment needs to be refreshed in the cache.
109
- */
110
- boolean shouldRefreshPendingSegment (PendingSegmentRecord record )
111
- {
112
- final SegmentIdWithShardSpec segmentId = record .getId ();
113
99
return withReadLock (
114
- () -> !readSegmentsFor (segmentId .getInterval ())
115
- .idToPendingSegment
116
- .containsKey (segmentId .toString ())
100
+ () -> readSegmentsFor (segmentId .getInterval ())
101
+ .shouldRefreshUsedSegment (segmentId , persistedUpdateTime )
117
102
);
118
103
}
119
104
@@ -139,60 +124,6 @@ private static boolean shouldUpdateCache(
139
124
}
140
125
}
141
126
142
- /**
143
- * Adds or updates the given segment in the cache.
144
- *
145
- * @return true if the segment was updated in the cache, false if the segment
146
- * was left unchanged in the cache.
147
- */
148
- boolean addSegment (DataSegmentPlus segmentPlus )
149
- {
150
- if (Boolean .TRUE .equals (segmentPlus .getUsed ())) {
151
- return addUsedSegment (segmentPlus );
152
- } else {
153
- return addUnusedSegmentId (
154
- segmentPlus .getDataSegment ().getId (),
155
- segmentPlus .getUsedStatusLastUpdatedDate ()
156
- );
157
- }
158
- }
159
-
160
- /**
161
- * Adds or updates a used segment in the cache.
162
- */
163
- private boolean addUsedSegment (DataSegmentPlus segmentPlus )
164
- {
165
- final DataSegment segment = segmentPlus .getDataSegment ();
166
- final SegmentId segmentId = segment .getId ();
167
-
168
- return withWriteLock (() -> {
169
- if (!shouldRefreshUsedSegment (segmentId , segmentPlus .getUsedStatusLastUpdatedDate ())) {
170
- return false ;
171
- }
172
-
173
- final SegmentsInInterval segments = writeSegmentsFor (segmentId .getInterval ());
174
- segments .idToUsedSegment .put (segmentId , segmentPlus );
175
- segments .unusedSegmentIdToUpdatedTime .remove (segment .getId ());
176
- return true ;
177
- });
178
- }
179
-
180
- /**
181
- * Adds or updates an unused segment in the cache.
182
- *
183
- * @param updatedTime Last updated time of this segment as persisted in the
184
- * metadata store. This value can be null for segments
185
- * persisted to the metadata store before the column
186
- * used_status_last_updated was added to the segments table.
187
- */
188
- boolean addUnusedSegmentId (SegmentId segmentId , @ Nullable DateTime updatedTime )
189
- {
190
- return withWriteLock (
191
- () -> writeSegmentsFor (segmentId .getInterval ())
192
- .addUnusedSegmentId (segmentId , updatedTime )
193
- );
194
- }
195
-
196
127
/**
197
128
* Atomically updates segment IDs in the cache based on the segments
198
129
* currently present in the metadata store.
@@ -219,9 +150,7 @@ SegmentSyncResult syncSegmentIds(List<SegmentRecord> persistedSegments, DateTime
219
150
220
151
if (record .isUsed ()) {
221
152
// Refresh this used segment if it has been updated in the metadata store
222
- final DataSegmentPlus cachedState = intervalSegments .idToUsedSegment .get (segmentId );
223
- if (cachedState == null
224
- || shouldUpdateCache (cachedState .getUsedStatusLastUpdatedDate (), record .getLastUpdatedTime ())) {
153
+ if (intervalSegments .shouldRefreshUsedSegment (segmentId , record .getLastUpdatedTime ())) {
225
154
usedSegmentIdsToRefresh .add (segmentId .toString ());
226
155
}
227
156
} else {
@@ -241,14 +170,23 @@ SegmentSyncResult syncSegmentIds(List<SegmentRecord> persistedSegments, DateTime
241
170
});
242
171
}
243
172
244
- SegmentSyncResult syncPendingSegments (List <PendingSegmentRecord > persistedPendingSegments , DateTime syncStartTime )
173
+ /**
174
+ * Atomically updates pending segments in the cache based on the segments
175
+ * currently present in the metadata store.
176
+ *
177
+ * @param persistedPendingSegments All pending segments present in the metadata store.
178
+ * @param syncStartTime Start time of the current sync
179
+ * @return Summary of updates made to the cache.
180
+ */
181
+ SegmentSyncResult syncPendingSegments (
182
+ List <PendingSegmentRecord > persistedPendingSegments ,
183
+ DateTime syncStartTime
184
+ )
245
185
{
246
186
return withWriteLock (() -> {
247
187
int numSegmentsUpdated = 0 ;
248
188
for (PendingSegmentRecord record : persistedPendingSegments ) {
249
- final boolean updated = shouldRefreshPendingSegment (record )
250
- && insertPendingSegment (record , false );
251
- if (updated ) {
189
+ if (insertPendingSegment (record , false )) {
252
190
++numSegmentsUpdated ;
253
191
}
254
192
}
@@ -264,7 +202,7 @@ SegmentSyncResult syncPendingSegments(List<PendingSegmentRecord> persistedPendin
264
202
* Removes all pending segments which are present in the cache but not present
265
203
* in the metadata store.
266
204
*/
267
- int removeUnpersistedPendingSegments (Set <String > persistedPendingSegmentIds , DateTime pollStartTime )
205
+ private int removeUnpersistedPendingSegments (Set <String > persistedPendingSegmentIds , DateTime pollStartTime )
268
206
{
269
207
return withWriteLock (() -> {
270
208
final Set <String > unpersistedSegmentIds =
@@ -284,7 +222,7 @@ && shouldUpdateCache(record.getCreatedDate(), pollStartTime)
284
222
* @param syncStartTime Start time of the current sync
285
223
* @return Number of unpersisted segments removed from cache.
286
224
*/
287
- int removeUnpersistedSegments (Set <SegmentId > persistedSegmentIds , DateTime syncStartTime )
225
+ private int removeUnpersistedSegments (Set <SegmentId > persistedSegmentIds , DateTime syncStartTime )
288
226
{
289
227
return withWriteLock (() -> {
290
228
final Set <SegmentId > unpersistedSegmentIds = new HashSet <>();
@@ -350,11 +288,17 @@ void markCacheSynced()
350
288
});
351
289
}
352
290
291
+ /**
292
+ * Must be accessed within a {@link #withReadLock} method.
293
+ */
353
294
private SegmentsInInterval readSegmentsFor (Interval interval )
354
295
{
355
296
return intervalToSegments .getOrDefault (interval , SegmentsInInterval .EMPTY );
356
297
}
357
298
299
+ /**
300
+ * Must be accessed within a {@link #withWriteLock} method.
301
+ */
358
302
private SegmentsInInterval writeSegmentsFor (Interval interval )
359
303
{
360
304
return intervalToSegments .computeIfAbsent (interval , i -> new SegmentsInInterval ());
@@ -535,7 +479,8 @@ public int insertSegments(Set<DataSegmentPlus> segments)
535
479
return withWriteLock (() -> {
536
480
int numInsertedSegments = 0 ;
537
481
for (DataSegmentPlus segmentPlus : segments ) {
538
- if (addSegment (segmentPlus )) {
482
+ final Interval interval = segmentPlus .getDataSegment ().getInterval ();
483
+ if (writeSegmentsFor (interval ).addSegment (segmentPlus )) {
539
484
++numInsertedSegments ;
540
485
}
541
486
}
@@ -553,19 +498,22 @@ public int insertSegmentsWithMetadata(Set<DataSegmentPlus> segments)
553
498
@ Override
554
499
public boolean markSegmentAsUnused (SegmentId segmentId , DateTime updateTime )
555
500
{
556
- return addUnusedSegmentId (segmentId , updateTime );
501
+ return writeSegmentsFor ( segmentId . getInterval ()). addUnusedSegmentId (segmentId , updateTime );
557
502
}
558
503
559
504
@ Override
560
505
public int markSegmentsAsUnused (Set <SegmentId > segmentIds , DateTime updateTime )
561
506
{
562
- int updatedCount = 0 ;
563
- for (SegmentId segmentId : segmentIds ) {
564
- if (addUnusedSegmentId (segmentId , updateTime )) {
565
- ++updatedCount ;
507
+ return withWriteLock (() -> {
508
+ int updatedCount = 0 ;
509
+ for (SegmentId segmentId : segmentIds ) {
510
+ final Interval interval = segmentId .getInterval ();
511
+ if (writeSegmentsFor (interval ).addUnusedSegmentId (segmentId , updateTime )) {
512
+ ++updatedCount ;
513
+ }
566
514
}
567
- }
568
- return updatedCount ;
515
+ return updatedCount ;
516
+ }) ;
569
517
}
570
518
571
519
@ Override
@@ -577,29 +525,38 @@ public int markSegmentsWithinIntervalAsUnused(
577
525
{
578
526
final Set <String > eligibleVersions = versions == null ? null : Set .copyOf (versions );
579
527
580
- int updatedCount = 0 ;
581
- for (DataSegmentPlus segment : findUsedSegmentsPlusOverlappingAnyOf (List .of (interval ))) {
582
- // Update segments with eligible versions or all versions (if eligibleVersions is null)
583
- if ((eligibleVersions == null || eligibleVersions .contains (segment .getDataSegment ().getVersion ()))
584
- && addUnusedSegmentId (segment .getDataSegment ().getId (), updateTime )) {
585
- ++updatedCount ;
528
+ return withWriteLock (() -> {
529
+ int updatedCount = 0 ;
530
+ for (DataSegmentPlus segmentPlus : findUsedSegmentsPlusOverlappingAnyOf (List .of (interval ))) {
531
+ // Update segments with eligible versions or all versions (if eligibleVersions is null)
532
+ final DataSegment segment = segmentPlus .getDataSegment ();
533
+ final boolean isEligibleVersion = eligibleVersions == null
534
+ || eligibleVersions .contains (segment .getVersion ());
535
+ if (isEligibleVersion
536
+ && writeSegmentsFor (segment .getInterval ()).addUnusedSegmentId (segment .getId (), updateTime )) {
537
+ ++updatedCount ;
538
+ }
586
539
}
587
- }
588
540
589
- return updatedCount ;
541
+ return updatedCount ;
542
+ });
590
543
}
591
544
592
545
@ Override
593
546
public int markAllSegmentsAsUnused (DateTime updateTime )
594
547
{
595
- int updatedCount = 0 ;
596
- for (DataSegmentPlus segment : findUsedSegmentsPlusOverlappingAnyOf (List .of ())) {
597
- if (addUnusedSegmentId (segment .getDataSegment ().getId (), updateTime )) {
598
- ++updatedCount ;
548
+ return withWriteLock (() -> {
549
+ int updatedCount = 0 ;
550
+ for (DataSegmentPlus segmentPlus : findUsedSegmentsPlusOverlappingAnyOf (List .of ())) {
551
+ final DataSegment segment = segmentPlus .getDataSegment ();
552
+ if (writeSegmentsFor (segment .getInterval ())
553
+ .addUnusedSegmentId (segment .getId (), updateTime )) {
554
+ ++updatedCount ;
555
+ }
599
556
}
600
- }
601
557
602
- return updatedCount ;
558
+ return updatedCount ;
559
+ });
603
560
}
604
561
605
562
@ Override
@@ -786,6 +743,49 @@ private void updateMaxUnusedId(SegmentId segmentId)
786
743
.merge (segmentId .getVersion (), segmentId .getPartitionNum (), Math ::max );
787
744
}
788
745
746
+ /**
747
+ * Adds or updates the given segment in the cache.
748
+ *
749
+ * @return true if the segment was updated in the cache, false if the segment
750
+ * was left unchanged in the cache.
751
+ */
752
+ boolean addSegment (DataSegmentPlus segmentPlus )
753
+ {
754
+ if (Boolean .TRUE .equals (segmentPlus .getUsed ())) {
755
+ return addUsedSegment (segmentPlus );
756
+ } else {
757
+ return addUnusedSegmentId (
758
+ segmentPlus .getDataSegment ().getId (),
759
+ segmentPlus .getUsedStatusLastUpdatedDate ()
760
+ );
761
+ }
762
+ }
763
+
764
+ /**
765
+ * Adds or updates a used segment in the cache.
766
+ */
767
+ private boolean addUsedSegment (DataSegmentPlus segmentPlus )
768
+ {
769
+ final DataSegment segment = segmentPlus .getDataSegment ();
770
+ final SegmentId segmentId = segment .getId ();
771
+
772
+ if (!shouldRefreshUsedSegment (segmentId , segmentPlus .getUsedStatusLastUpdatedDate ())) {
773
+ return false ;
774
+ }
775
+
776
+ idToUsedSegment .put (segmentId , segmentPlus );
777
+ unusedSegmentIdToUpdatedTime .remove (segment .getId ());
778
+ return true ;
779
+ }
780
+
781
+ /**
782
+ * Adds or updates an unused segment in the cache.
783
+ *
784
+ * @param updatedTime Last updated time of this segment as persisted in the
785
+ * metadata store. This value can be null for segments
786
+ * persisted to the metadata store before the column
787
+ * used_status_last_updated was added to the segments table.
788
+ */
789
789
private boolean addUnusedSegmentId (SegmentId segmentId , @ Nullable DateTime updatedTime )
790
790
{
791
791
idToUsedSegment .remove (segmentId );
@@ -799,5 +799,24 @@ private boolean addUnusedSegmentId(SegmentId segmentId, @Nullable DateTime updat
799
799
return false ;
800
800
}
801
801
}
802
+
803
+ private boolean shouldRefreshUnusedSegment (SegmentId segmentId , DateTime newUpdateTime )
804
+ {
805
+ return !unusedSegmentIdToUpdatedTime .containsKey (segmentId )
806
+ || shouldUpdateCache (unusedSegmentIdToUpdatedTime .get (segmentId ), newUpdateTime );
807
+ }
808
+
809
+ private boolean shouldRefreshUsedSegment (SegmentId segmentId , DateTime newUpdateTime )
810
+ {
811
+ final DataSegmentPlus usedSegment = idToUsedSegment .get (segmentId );
812
+
813
+ if (usedSegment == null ) {
814
+ // Do not refresh the segment if it has recently been marked as unused in the cache
815
+ return shouldRefreshUnusedSegment (segmentId , newUpdateTime );
816
+ } else {
817
+ // Refresh the used segment if the entry in the cache is stale
818
+ return shouldUpdateCache (usedSegment .getUsedStatusLastUpdatedDate (), newUpdateTime );
819
+ }
820
+ }
802
821
}
803
822
}
0 commit comments