Skip to content

Commit f81e20f

Browse files
committed
Add test
1 parent 96f89b6 commit f81e20f

File tree

4 files changed

+42
-26
lines changed

4 files changed

+42
-26
lines changed

server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,9 @@ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
201201
);
202202
}
203203
catch (Throwable th) {
204-
log.error("Received error while fetching historical capabilities from Server[%s].", serverId);
205204
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
206205
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
207-
.build(th, "Failed to fetch historical capabilities");
206+
.build(th, "Received error while fetching historical capabilities from Server[%s].", serverId);
208207
}
209208
}
210209

@@ -216,16 +215,7 @@ private void doSegmentManagement()
216215
}
217216

218217
final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
219-
int batchSize;
220-
if (config.getBatchSize() != null) {
221-
batchSize = config.getBatchSize();
222-
} else if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
223-
batchSize = serverCapabilities.getNumTurboLoadingThreads();
224-
} else if (SegmentLoadingMode.NORMAL.equals(loadingMode)) {
225-
batchSize = serverCapabilities.getNumLoadingThreads();
226-
} else {
227-
throw DruidException.defensive().build("unsupported loading mode");
228-
}
218+
int batchSize = calculateBatchSize(loadingMode);
229219

230220
final List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
231221

@@ -391,6 +381,20 @@ private void logRequestFailure(Throwable t)
391381
}
392382
}
393383

384+
@VisibleForTesting
385+
int calculateBatchSize(SegmentLoadingMode loadingMode)
386+
{
387+
if (config.getBatchSize() != null) {
388+
return config.getBatchSize();
389+
} else if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
390+
return serverCapabilities.getNumTurboLoadingThreads();
391+
} else if (SegmentLoadingMode.NORMAL.equals(loadingMode)) {
392+
return serverCapabilities.getNumLoadingThreads();
393+
} else {
394+
throw DruidException.defensive().build("unsupported loading mode");
395+
}
396+
}
397+
394398
private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentChangeStatus status)
395399
{
396400
changeRequest.go(

server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public void testProcessBatch() throws Exception
245245
runnable.run();
246246
}
247247

248-
result = handler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1)), SegmentLoadingMode.NORMAL).get();
248+
result = handler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1)), SegmentLoadingMode.TURBO).get();
249249
Assert.assertEquals(SegmentChangeStatus.SUCCESS, result.get(0).getStatus());
250250

251251
Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments());

server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void setUp()
103103
),
104104
httpClient.callbackExecutor,
105105
() -> SegmentLoadingMode.NORMAL,
106-
new HistoricalLoadingCapabilities(1, 1)
106+
new HistoricalLoadingCapabilities(1, 3)
107107
);
108108
httpLoadQueuePeon.start();
109109
}
@@ -333,6 +333,30 @@ public void testLoadRateIsChangedWhenLoadSucceeds() throws InterruptedException
333333
);
334334
}
335335

336+
@Test
337+
public void testBatchSize()
338+
{
339+
Assert.assertEquals(10, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
340+
341+
httpLoadQueuePeon = new HttpLoadQueuePeon(
342+
"http://dummy:4000",
343+
MAPPER,
344+
httpClient,
345+
new HttpLoadQueuePeonConfig(null, null, null),
346+
new WrappingScheduledExecutorService(
347+
"HttpLoadQueuePeonTest-%s",
348+
httpClient.processingExecutor,
349+
true
350+
),
351+
httpClient.callbackExecutor,
352+
() -> SegmentLoadingMode.NORMAL,
353+
new HistoricalLoadingCapabilities(1, 3)
354+
);
355+
356+
Assert.assertEquals(1, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
357+
Assert.assertEquals(3, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO));
358+
}
359+
336360
private LoadPeonCallback markSegmentProcessed(DataSegment segment)
337361
{
338362
return success -> httpClient.processedSegments.add(segment);

server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java

-12
Original file line numberDiff line numberDiff line change
@@ -545,18 +545,6 @@ public void testTurboLoadingNodes()
545545
Assert.assertEquals(SegmentLoadingMode.TURBO, config.getLoadingModeForServer("localhost:8083"));
546546
}
547547

548-
@Test
549-
public void testTurboLoadingNodes()
550-
{
551-
CoordinatorDynamicConfig config = CoordinatorDynamicConfig
552-
.builder()
553-
.withTurboLoadingNodes(ImmutableSet.of("localhost:8083"))
554-
.build();
555-
556-
Assert.assertEquals(SegmentLoadingMode.NORMAL, config.getLoadingModeForServer("localhost:8082"));
557-
Assert.assertEquals(SegmentLoadingMode.TURBO, config.getLoadingModeForServer("localhost:8083"));
558-
}
559-
560548
@Test
561549
public void testEqualsAndHashCode()
562550
{

0 commit comments

Comments
 (0)