Skip to content

Commit ee7db7d

Browse files
authored
Simplify data consistency check code (#22504)
1 parent f1a50e1 commit ee7db7d

File tree

4 files changed

+19
-18
lines changed

4 files changed

+19
-18
lines changed

kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,10 @@ private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm
104104
Collection<String> columnNames = tableMetaData.getColumnNames();
105105
Map<String, Object> tableCheckPositions = jobItemContext.getTableCheckPositions();
106106
DataConsistencyCalculateParameter sourceParam = buildParameter(
107-
sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey,
108-
tableCheckPositions.get(sourceTableName));
107+
sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey, tableCheckPositions.get(sourceTableName));
109108
String targetTableName = targetTable.getTableName().getOriginal();
110-
DataConsistencyCalculateParameter targetParam = buildParameter(
111-
targetDataSource, targetTable.getSchemaName().getOriginal(), targetTableName, columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey,
112-
tableCheckPositions.get(targetTableName));
109+
DataConsistencyCalculateParameter targetParam = buildParameter(targetDataSource, targetTable.getSchemaName().getOriginal(), targetTableName,
110+
columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey, tableCheckPositions.get(targetTableName));
113111
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParam).iterator();
114112
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParam).iterator();
115113
long sourceRecordsCount = 0;
@@ -131,10 +129,10 @@ private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm
131129
break;
132130
}
133131
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
134-
jobItemContext.getTableCheckPositions().put(sourceTableName, sourceCalculatedResult.getMaxUniqueKeyValue().get());
132+
tableCheckPositions.put(sourceTableName, sourceCalculatedResult.getMaxUniqueKeyValue().get());
135133
}
136134
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
137-
jobItemContext.getTableCheckPositions().put(targetTableName, targetCalculatedResult.getMaxUniqueKeyValue().get());
135+
tableCheckPositions.put(targetTableName, targetCalculatedResult.getMaxUniqueKeyValue().get());
138136
}
139137
jobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
140138
}

kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
6060

6161
private final long checkBeginTimeMillis;
6262

63-
private Long checkEndTimeMillis;
63+
private volatile Long checkEndTimeMillis;
6464

6565
private final Map<String, Object> tableCheckPositions = new ConcurrentHashMap<>();
6666

kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@ protected void runBlocking() {
9999
DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
100100
parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
101101
setCalculateAlgorithm(calculateAlgorithm);
102-
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext);
103-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
104-
jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
102+
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
103+
try {
104+
dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext);
105+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
106+
} finally {
107+
jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
108+
}
105109
}
106110

107111
@Override

kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,12 @@ public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalcul
8080
try (
8181
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
8282
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
83-
// TODO simplify code
84-
MigrationJobAPI migrationJobAPI = MigrationJobAPIFactory.getInstance();
85-
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = migrationJobAPI.getJobProgress(jobConfig);
86-
long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
87-
checkJobItemContext.setRecordsCount(recordsCount);
83+
checkJobItemContext.setRecordsCount(getRecordsCount());
8884
checkJobItemContext.getTableNames().add(jobConfig.getSourceTableName());
8985
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
9086
SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
9187
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
9288
result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
93-
// TODO make sure checkEndTimeMillis will be set
94-
checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
9589
} catch (final SQLException ex) {
9690
throw new SQLWrapperException(ex);
9791
}
@@ -102,4 +96,9 @@ private void verifyPipelineDatabaseType(final DataConsistencyCalculateAlgorithm
10296
ShardingSpherePreconditions.checkState(calculateAlgorithm.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
10397
() -> new UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
10498
}
99+
100+
private long getRecordsCount() {
101+
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig);
102+
return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
103+
}
105104
}

0 commit comments

Comments
 (0)