Skip to content

Commit

Permalink
Retry if the failure
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Jan 14, 2022
1 parent da1a5f7 commit 2319305
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.workers.temporal.scheduling;

import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.exception.RetryableException;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
Expand Down Expand Up @@ -83,12 +85,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> {
final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(
connectionUpdaterInput.getConnectionId()));
connectionUpdaterInput.setJobId(jobCreationOutput.getJobId());
return Optional.ofNullable(jobCreationOutput.getJobId());
});

maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> {
final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput(
jobId));
connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId());
return attemptCreationOutput.getAttemptId();
}));

Expand Down Expand Up @@ -121,6 +125,12 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
syncWorkflowInputs.getDestinationLauncherConfig(),
syncWorkflowInputs.getSyncInput(),
connectionId));

StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary();

if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) {
workflowState.setFailed(true);
}
} catch (final ChildWorkflowFailure childWorkflowFailure) {
if (!(childWorkflowFailure.getCause() instanceof CanceledFailure)) {
throw childWorkflowFailure;
Expand All @@ -146,6 +156,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
} else if (workflowState.isCancelled()) {
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(
maybeJobId.get()));
} else if (workflowState.isFailed()) {
reportFailure(connectionUpdaterInput);
} else {
// report success
reportSuccess(connectionUpdaterInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
private boolean skipScheduling = false;
private boolean updated = false;
private boolean cancelled = false;
private boolean failed = false;

public void setRunning(final boolean running) {
final ChangedStateEvent event = new ChangedStateEvent(
Expand Down Expand Up @@ -68,12 +69,21 @@ public void setCancelled(final boolean cancelled) {
this.cancelled = cancelled;
}

public void setFailed(final boolean failed) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.FAILED,
failed);
stateChangedListener.addEvent(id, event);
this.failed = failed;
}

public void reset() {
this.setRunning(false);
this.setDeleted(false);
this.setSkipScheduling(false);
this.setUpdated(false);
this.setCancelled(false);
this.setFailed(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ enum StateField {
DELETED,
RUNNING,
SKIPPED_SCHEDULING,
UPDATED
UPDATED,
FAILED
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.temporal.exception.RetryableException;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
Expand Down Expand Up @@ -151,6 +152,7 @@ public void setJobSuccess() throws IOException {
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobNotifier).successJob(Mockito.any());
Mockito.verify(mJobtracker).trackSync(Mockito.any(), Mockito.eq(JobState.SUCCEEDED));
}

@Test
Expand Down

0 comments on commit 2319305

Please sign in to comment.