Skip to content

Commit dfc20f9

Browse files
authored
More logging cleanup on Overlord (#17780)
1 parent 7182699 commit dfc20f9

File tree

8 files changed

+35
-39
lines changed

8 files changed

+35
-39
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ private boolean processBatch(AllocateRequestBatch requestBatch)
341341
}
342342

343343
log.debug(
344-
"Processing [%d] requests for batch [%s], queue time [%s].",
344+
"Processing [%d] requests for batch[%s], queue time[%s].",
345345
requestBatch.size(), requestKey, requestBatch.getQueueTime()
346346
);
347347

@@ -355,21 +355,21 @@ private boolean processBatch(AllocateRequestBatch requestBatch)
355355

356356
emitBatchMetric("task/action/batch/attempts", 1L, requestKey);
357357
emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - startTimeMillis), requestKey);
358-
log.info("Successfully processed [%d / %d] requests in batch [%s].", successCount, batchSize, requestKey);
358+
log.debug("Successfully processed [%d / %d] requests in batch[%s].", successCount, batchSize, requestKey);
359359

360360
if (requestBatch.isEmpty()) {
361361
return true;
362362
}
363363

364364
// Requeue the batch only if used segments have changed
365-
log.debug("There are [%d] failed requests in batch [%s].", requestBatch.size(), requestKey);
365+
log.debug("There are [%d] failed requests in batch[%s].", requestBatch.size(), requestKey);
366366
final Set<DataSegment> updatedUsedSegments = retrieveUsedSegments(requestKey);
367367

368368
if (updatedUsedSegments.equals(usedSegments)) {
369369
log.warn(
370-
"Completing [%d] failed requests in batch [%s] with null value as there"
370+
"Completing [%d] failed requests in batch[%s] with null value as there"
371371
+ " are conflicting segments. Cannot retry allocation until the set of"
372-
+ " used segments overlapping the allocation interval [%s] changes.",
372+
+ " used segments overlapping the allocation interval[%s] changes.",
373373
size(), requestKey, requestKey.preferredAllocationInterval
374374
);
375375

@@ -452,9 +452,9 @@ private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set<Data
452452
}
453453

454454
if (!requestsWithPartialOverlap.isEmpty()) {
455-
log.info(
456-
"Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments."
457-
+ " These cannot be processed until the set of used segments changes. Example request: [%s]",
455+
log.warn(
456+
"Found [%d] requests in batch[%s] with row intervals that partially overlap existing segments."
457+
+ " These cannot be processed until the set of used segments changes. Example request[%s]",
458458
requestsWithPartialOverlap.size(), requestBatch.key, requestsWithPartialOverlap.get(0)
459459
);
460460
}
@@ -485,7 +485,7 @@ private int allocateSegmentsForInterval(
485485

486486
final AllocateRequestKey requestKey = requestBatch.key;
487487
log.debug(
488-
"Trying allocation for [%d] requests, interval [%s] in batch [%s]",
488+
"Trying allocation for [%d] requests, interval[%s] in batch[%s]",
489489
requests.size(), tryInterval, requestKey
490490
);
491491

@@ -531,14 +531,14 @@ private void emitTaskMetric(String metric, long value, SegmentAllocateRequest re
531531
{
532532
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
533533
IndexTaskUtils.setTaskDimensions(metricBuilder, request.getTask());
534-
metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
534+
metricBuilder.setDimension(DruidMetrics.TASK_ACTION_TYPE, SegmentAllocateAction.TYPE);
535535
emitter.emit(metricBuilder.setMetric(metric, value));
536536
}
537537

538538
private void emitBatchMetric(String metric, long value, AllocateRequestKey key)
539539
{
540540
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
541-
metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
541+
metricBuilder.setDimension(DruidMetrics.TASK_ACTION_TYPE, SegmentAllocateAction.TYPE);
542542
metricBuilder.setDimension(DruidMetrics.DATASOURCE, key.dataSource);
543543
metricBuilder.setDimension(DruidMetrics.INTERVAL, key.preferredAllocationInterval.toString());
544544
emitter.emit(metricBuilder.setMetric(metric, value));
@@ -651,15 +651,15 @@ void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
651651
emitTaskMetric("task/action/success/count", 1L, request);
652652
requestToFuture.remove(request).complete(result.getSegmentId());
653653
} else if (request.canRetry()) {
654-
log.info(
655-
"Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].",
654+
log.debug(
655+
"Allocation failed on attempt [%d] due to error[%s]. Can still retry action[%s].",
656656
request.getAttempts(), result.getErrorMessage(), request.getAction()
657657
);
658658
} else {
659659
emitTaskMetric("task/action/failed/count", 1L, request);
660660
log.error(
661-
"Exhausted max attempts [%d] for allocation with latest error [%s]."
662-
+ " Completing action [%s] with a null value.",
661+
"Exhausted max attempts[%d] for allocation with latest error[%s]."
662+
+ " Completing action[%s] with a null value.",
663663
request.getAttempts(), result.getErrorMessage(), request.getAction()
664664
);
665665
requestToFuture.remove(request).complete(null);

indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public List<SegmentAllocateResult> allocateSegments(
474474
boolean reduceMetadataIO
475475
)
476476
{
477-
log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval);
477+
log.debug("Allocating [%d] segments for datasource[%s], interval[%s]", requests.size(), dataSource, interval);
478478
final boolean isTimeChunkLock = lockGranularity == LockGranularity.TIME_CHUNK;
479479

480480
final AllocationHolderList holderList = new AllocationHolderList(requests, interval);
@@ -1132,11 +1132,11 @@ private void unlock(final Task task, final Interval interval, @Nullable Integer
11321132

11331133
if (match) {
11341134
// Remove task from live list
1135-
log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock);
1135+
log.debug("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock);
11361136
final boolean removed = taskLockPosse.removeTask(task);
11371137

11381138
if (taskLockPosse.isTasksEmpty()) {
1139-
log.info("TaskLock[%s] is now empty.", taskLock);
1139+
log.debug("TaskLock[%s] is now empty.", taskLock);
11401140
possesHolder.remove(taskLockPosse);
11411141
}
11421142
if (possesHolder.isEmpty()) {

indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ private void manageInternalCritical(
440440
continue;
441441
}
442442
if (taskIsReady) {
443-
log.info("Asking taskRunner to run: %s", task.getId());
443+
log.info("Asking taskRunner to run task[%s]", task.getId());
444444
runnerTaskFuture = taskRunner.run(task);
445445
} else {
446446
// Task.isReady() can internally lock intervals or segments.
@@ -469,7 +469,7 @@ private void manageInternalPostCritical(
469469
// Kill tasks that shouldn't be running
470470
final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
471471
if (!tasksToKill.isEmpty()) {
472-
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
472+
log.info("Asking taskRunner to clean up [%,d] tasks.", tasksToKill.size());
473473

474474
// On large installations running several thousands of tasks,
475475
// concatenating the list of known task ids can be compupationally expensive.
@@ -483,7 +483,7 @@ private void manageInternalPostCritical(
483483
taskRunner.shutdown(taskId, reason);
484484
}
485485
catch (Exception e) {
486-
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
486+
log.warn(e, "TaskRunner failed to clean up task[%s].", taskId);
487487
}
488488
}
489489
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3222,7 +3222,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
32223222

32233223
final boolean stopTasksEarly;
32243224
if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
3225-
log.info("Early stop requested, signalling tasks to complete.");
3225+
log.info("Early stop requested for supervisor[%s], signalling tasks to complete.", supervisorId);
32263226
earlyStopTime = null;
32273227
stopTasksEarly = true;
32283228
} else {

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private Runnable computeAndCollectLag()
178178
}
179179
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
180180
} else {
181-
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
181+
log.debug("Supervisor[%s] is suspended, skipping lag collection", dataSource);
182182
}
183183
}
184184
catch (Exception e) {
@@ -237,7 +237,7 @@ private int computeDesiredTaskCount(List<Long> lags)
237237

238238
int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
239239
if (currentActiveTaskCount == actualTaskCountMax) {
240-
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
240+
log.debug("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
241241
dataSource
242242
);
243243
emitter.emit(metricBuilder
@@ -258,7 +258,7 @@ private int computeDesiredTaskCount(List<Long> lags)
258258
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
259259
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
260260
if (currentActiveTaskCount == actualTaskCountMin) {
261-
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
261+
log.debug("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource[%s].",
262262
dataSource
263263
);
264264
emitter.emit(metricBuilder

processing/src/main/java/org/apache/druid/query/DruidMetrics.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,29 @@
2525
import java.util.List;
2626

2727
/**
28+
* Contains dimension names used while emitting metrics.
2829
*/
2930
public class DruidMetrics
3031
{
32+
// Query dimensions
3133
public static final String DATASOURCE = "dataSource";
3234
public static final String TYPE = "type";
3335
public static final String INTERVAL = "interval";
3436
public static final String ID = "id";
3537
public static final String SUBQUERY_ID = "subQueryId";
36-
public static final String TASK_ID = "taskId";
37-
public static final String GROUP_ID = "groupId";
3838
public static final String STATUS = "status";
39-
public static final String TASK_INGESTION_MODE = "taskIngestionMode";
40-
41-
public static final String PARTITIONING_TYPE = "partitioningType";
4239

43-
// task metrics
40+
// Task dimensions
41+
public static final String TASK_ID = "taskId";
42+
public static final String GROUP_ID = "groupId";
4443
public static final String TASK_TYPE = "taskType";
4544
public static final String TASK_STATUS = "taskStatus";
4645

46+
// Ingestion dimensions
47+
public static final String PARTITIONING_TYPE = "partitioningType";
48+
public static final String TASK_INGESTION_MODE = "taskIngestionMode";
49+
public static final String TASK_ACTION_TYPE = "taskActionType";
4750
public static final String STREAM = "stream";
48-
4951
public static final String PARTITION = "partition";
5052

5153
public static final String TAGS = "tags";

processing/src/main/java/org/apache/druid/query/Druids.java

-5
Original file line numberDiff line numberDiff line change
@@ -1065,9 +1065,4 @@ public static DataSourceMetadataQueryBuilder newDataSourceMetadataQueryBuilder()
10651065
{
10661066
return new DataSourceMetadataQueryBuilder();
10671067
}
1068-
1069-
public static FilteredDataSource filteredDataSource(DataSource base, DimFilter filter)
1070-
{
1071-
return FilteredDataSource.create(base, filter);
1072-
}
10731068
}

server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,7 @@ private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
12611261
if (createdSegment != null) {
12621262
pendingSegments.add(createdSegment.getId());
12631263
uniqueRequestToSegment.put(uniqueRequest, createdSegment);
1264-
log.info("Created new segment[%s]", createdSegment.getId());
1264+
log.debug("Created new segment[%s]", createdSegment.getId());
12651265
}
12661266
}
12671267

@@ -1270,7 +1270,6 @@ private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
12701270
}
12711271
}
12721272

1273-
log.info("Created [%d] new segments for [%d] allocate requests.", uniqueRequestToSegment.size(), requests.size());
12741273
return createdSegments;
12751274
}
12761275

0 commit comments

Comments
 (0)