Skip to content

Commit f28d5f3

Browse files
authored
Only run normalization when needed (#16794)
Only run normalization when records have been committed
1 parent bded0ac commit f28d5f3

File tree

10 files changed

+265
-9
lines changed

10 files changed

+265
-9
lines changed

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

+13
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
3737
import io.airbyte.db.jdbc.JdbcUtils;
3838
import io.airbyte.persistence.job.models.Attempt;
39+
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
3940
import io.airbyte.persistence.job.models.AttemptStatus;
4041
import io.airbyte.persistence.job.models.AttemptWithJobInfo;
4142
import io.airbyte.persistence.job.models.Job;
@@ -625,6 +626,18 @@ public List<AttemptWithJobInfo> listAttemptsWithJobInfo(final ConfigType configT
625626
timeConvertedIntoLocalDateTime)));
626627
}
627628

629+
@Override
630+
public List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException {
631+
return jobDatabase
632+
.query(ctx -> ctx.select(ATTEMPTS.ATTEMPT_NUMBER, SYNC_STATS.RECORDS_COMMITTED, NORMALIZATION_SUMMARIES.FAILURES)
633+
.from(ATTEMPTS)
634+
.join(SYNC_STATS).on(SYNC_STATS.ATTEMPT_ID.eq(ATTEMPTS.ID))
635+
.leftJoin(NORMALIZATION_SUMMARIES).on(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(ATTEMPTS.ID))
636+
.where(ATTEMPTS.JOB_ID.eq(jobId))
637+
.fetch(record -> new AttemptNormalizationStatus(record.get(ATTEMPTS.ATTEMPT_NUMBER),
638+
Optional.of(record.get(SYNC_STATS.RECORDS_COMMITTED)), record.get(NORMALIZATION_SUMMARIES.FAILURES) != null)));
639+
}
640+
628641
// Retrieves only Job information from the record, without any attempt info
629642
private static Job getJobFromRecord(final Record record) {
630643
return new Job(record.get(JOB_ID, Long.class),

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.airbyte.config.NormalizationSummary;
1212
import io.airbyte.config.SyncStats;
1313
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
14+
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
1415
import io.airbyte.persistence.job.models.AttemptWithJobInfo;
1516
import io.airbyte.persistence.job.models.Job;
1617
import io.airbyte.persistence.job.models.JobStatus;
@@ -294,4 +295,6 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
294295
*/
295296
void setSchedulerMigrationDone() throws IOException;
296297

298+
List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException;
299+
297300
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.persistence.job.models;
6+
7+
import java.util.Optional;
8+
9+
public record AttemptNormalizationStatus(long attemptNumber, Optional<Long> recordsCommitted, boolean normalizationFailed) {}

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/JobStatus.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public enum JobStatus {
2525
RUNNING, Set.of(INCOMPLETE, SUCCEEDED, FAILED, CANCELLED),
2626
INCOMPLETE, Set.of(PENDING, RUNNING, FAILED, CANCELLED, INCOMPLETE),
2727
SUCCEEDED, Set.of(),
28-
FAILED, Set.of(),
28+
FAILED, Set.of(FAILED),
2929
CANCELLED, Set.of());
3030

3131
}

airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.airbyte.workers.temporal.spec.SpecActivity;
2121
import io.airbyte.workers.temporal.sync.DbtTransformationActivity;
2222
import io.airbyte.workers.temporal.sync.NormalizationActivity;
23+
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivity;
2324
import io.airbyte.workers.temporal.sync.PersistStateActivity;
2425
import io.airbyte.workers.temporal.sync.ReplicationActivity;
2526
import io.micronaut.context.annotation.Factory;
@@ -101,8 +102,9 @@ public List<Object> syncActivities(
101102
final ReplicationActivity replicationActivity,
102103
final NormalizationActivity normalizationActivity,
103104
final DbtTransformationActivity dbtTransformationActivity,
104-
final PersistStateActivity persistStateActivity) {
105-
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);
105+
final PersistStateActivity persistStateActivity,
106+
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity) {
107+
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity);
106108
}
107109

108110
@Singleton
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.temporal.sync;
6+
7+
import io.temporal.activity.ActivityInterface;
8+
import io.temporal.activity.ActivityMethod;
9+
import java.io.IOException;
10+
import java.util.Optional;
11+
12+
@ActivityInterface
13+
public interface NormalizationSummaryCheckActivity {
14+
15+
@ActivityMethod
16+
boolean shouldRunNormalization(Long jobId, Long attemptId, Optional<Long> numCommittedRecords) throws IOException;
17+
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.temporal.sync;
6+
7+
import io.airbyte.persistence.job.JobPersistence;
8+
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
9+
import java.io.IOException;
10+
import java.util.Comparator;
11+
import java.util.List;
12+
import java.util.Optional;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
import java.util.concurrent.atomic.AtomicLong;
15+
import javax.inject.Singleton;
16+
import lombok.extern.slf4j.Slf4j;
17+
18+
@Slf4j
19+
@Singleton
20+
public class NormalizationSummaryCheckActivityImpl implements NormalizationSummaryCheckActivity {
21+
22+
private final Optional<JobPersistence> jobPersistence;
23+
24+
public NormalizationSummaryCheckActivityImpl(final Optional<JobPersistence> jobPersistence) {
25+
this.jobPersistence = jobPersistence;
26+
}
27+
28+
@Override
29+
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
30+
public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) throws IOException {
31+
// if job persistence is unavailable, default to running normalization
32+
if (jobPersistence.isEmpty()) {
33+
return true;
34+
}
35+
36+
// if the count of committed records for this attempt is > 0 OR if it is null,
37+
// then we should run normalization
38+
if (numCommittedRecords.get() == null || numCommittedRecords.get() > 0) {
39+
return true;
40+
}
41+
42+
final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.get().getAttemptNormalizationStatusesForJob(jobId);
43+
final AtomicLong totalRecordsCommitted = new AtomicLong(0L);
44+
final AtomicBoolean shouldReturnTrue = new AtomicBoolean(false);
45+
46+
attemptNormalizationStatuses.stream().sorted(Comparator.comparing(AttemptNormalizationStatus::attemptNumber).reversed()).toList()
47+
.forEach(n -> {
48+
if (n.attemptNumber() == attemptNumber) {
49+
return;
50+
}
51+
52+
// if normalization succeeded from a previous attempt succeeded,
53+
// we can stop looking for previous attempts
54+
if (!n.normalizationFailed()) {
55+
return;
56+
}
57+
58+
// if normalization failed on past attempt, add number of records committed on that attempt to total
59+
// committed number
60+
// if there is no data recorded for the number of committed records, we should assume that there
61+
// were committed records and run normalization
62+
if (n.recordsCommitted().isEmpty()) {
63+
shouldReturnTrue.set(true);
64+
return;
65+
} else if (n.recordsCommitted().get() != 0L) {
66+
totalRecordsCommitted.addAndGet(n.recordsCommitted().get());
67+
}
68+
});
69+
70+
if (shouldReturnTrue.get() || totalRecordsCommitted.get() > 0L) {
71+
return true;
72+
}
73+
74+
return false;
75+
76+
}
77+
78+
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java

+23
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1717
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
1818
import io.temporal.workflow.Workflow;
19+
import java.io.IOException;
20+
import java.util.Optional;
1921
import java.util.UUID;
2022
import javax.inject.Singleton;
2123
import org.slf4j.Logger;
@@ -28,6 +30,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
2830
private static final Logger LOGGER = LoggerFactory.getLogger(SyncWorkflowImpl.class);
2931
private static final String VERSION_LABEL = "sync-workflow";
3032
private static final int CURRENT_VERSION = 2;
33+
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
34+
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;
3135

3236
@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
3337
private ReplicationActivity replicationActivity;
@@ -37,6 +41,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
3741
private DbtTransformationActivity dbtTransformationActivity;
3842
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
3943
private PersistStateActivity persistActivity;
44+
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
45+
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
4046

4147
@Override
4248
public StandardSyncOutput run(final JobRunConfig jobRunConfig,
@@ -59,6 +65,23 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
5965
if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) {
6066
for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) {
6167
if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) {
68+
final int normalizationSummaryCheckVersion =
69+
Workflow.getVersion(NORMALIZATION_SUMMARY_CHECK_TAG, Workflow.DEFAULT_VERSION, NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION);
70+
if (normalizationSummaryCheckVersion >= NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION) {
71+
Boolean shouldRun;
72+
try {
73+
shouldRun = normalizationSummaryCheckActivity.shouldRunNormalization(Long.valueOf(jobRunConfig.getJobId()), jobRunConfig.getAttemptId(),
74+
Optional.of(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted()));
75+
} catch (final IOException e) {
76+
shouldRun = true;
77+
}
78+
if (!shouldRun) {
79+
LOGGER.info("Skipping normalization because there are no records to normalize.");
80+
break;
81+
}
82+
}
83+
84+
LOGGER.info("generating normalization input");
6285
final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput);
6386
final NormalizationSummary normalizationSummary =
6487
normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.temporal.scheduling.activities;
6+
7+
import static org.mockito.Mockito.mock;
8+
9+
import io.airbyte.persistence.job.JobPersistence;
10+
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
11+
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivityImpl;
12+
import java.io.IOException;
13+
import java.util.List;
14+
import java.util.Optional;
15+
import lombok.extern.slf4j.Slf4j;
16+
import org.assertj.core.api.Assertions;
17+
import org.junit.jupiter.api.BeforeAll;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.api.extension.ExtendWith;
20+
import org.mockito.Mockito;
21+
import org.mockito.junit.jupiter.MockitoExtension;
22+
23+
@Slf4j
24+
@ExtendWith(MockitoExtension.class)
25+
class NormalizationSummaryCheckActivityTest {
26+
27+
private static final Long JOB_ID = 10L;
28+
static private NormalizationSummaryCheckActivityImpl normalizationSummaryCheckActivity;
29+
static private JobPersistence mJobPersistence;
30+
31+
@BeforeAll
32+
static void setUp() {
33+
mJobPersistence = mock(JobPersistence.class);
34+
normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(Optional.of(mJobPersistence));
35+
}
36+
37+
@Test
38+
void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws IOException {
39+
// Attempt 1 committed records, but normalization failed
40+
// Attempt 2 did not commit records, normalization failed (or did not run)
41+
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), true);
42+
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true);
43+
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
44+
45+
Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
46+
}
47+
48+
@Test
49+
void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws IOException {
50+
Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(30L)));
51+
}
52+
53+
@Test
54+
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws IOException {
55+
// No attempts committed any records
56+
// Normalization did not run on any attempts
57+
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(0L), true);
58+
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true);
59+
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
60+
Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
61+
}
62+
63+
@Test
64+
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws IOException {
65+
// Records committed on first two attempts and normalization succeeded
66+
// No records committed on current attempt and normalization has not yet run
67+
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), false);
68+
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(20L), false);
69+
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
70+
Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
71+
}
72+
73+
}

0 commit comments

Comments
 (0)