Skip to content

Commit

Permalink
Retry if the failure
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Jan 14, 2022
1 parent ec80634 commit fa42598
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.scheduling;

import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.exception.RetryableException;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
Expand Down Expand Up @@ -121,6 +122,10 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
syncWorkflowInputs.getDestinationLauncherConfig(),
syncWorkflowInputs.getSyncInput(),
connectionId));

if (standardSyncOutput.get().getStandardSyncSummary().getStatus() == ReplicationStatus.FAILED) {
workflowState.setFailed(true);
}
} catch (final ChildWorkflowFailure childWorkflowFailure) {
if (!(childWorkflowFailure.getCause() instanceof CanceledFailure)) {
throw childWorkflowFailure;
Expand All @@ -146,6 +151,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
} else if (workflowState.isCancelled()) {
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(
maybeJobId.get()));
} else if (workflowState.isFailed()) {
reportFailure(connectionUpdaterInput);
} else {
// report success
reportSuccess(connectionUpdaterInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
private boolean skipScheduling = false;
private boolean updated = false;
private boolean cancelled = false;
private boolean failed = false;

public void setRunning(final boolean running) {
final ChangedStateEvent event = new ChangedStateEvent(
Expand Down Expand Up @@ -68,6 +69,14 @@ public void setCancelled(final boolean cancelled) {
this.cancelled = cancelled;
}

public void setFailed(final boolean failed) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.FAILED,
failed);
stateChangedListener.addEvent(id, event);
this.failed = failed;
}

public void reset() {
this.setRunning(false);
this.setDeleted(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ enum StateField {
DELETED,
RUNNING,
SKIPPED_SCHEDULING,
UPDATED
UPDATED,
FAILED
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.temporal.exception.RetryableException;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
Expand Down Expand Up @@ -151,6 +152,7 @@ public void setJobSuccess() throws IOException {
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobNotifier).successJob(Mockito.any());
Mockito.verify(mJobtracker).trackSync(Mockito.any(), JobState.SUCCEEDED);
}

@Test
Expand Down

0 comments on commit fa42598

Please sign in to comment.