From 23193056a9ecbe2edaa523c5ac64cb5a49c034db Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 14 Jan 2022 10:30:46 -0800 Subject: [PATCH] Retry if the failure --- .../scheduling/ConnectionManagerWorkflowImpl.java | 12 ++++++++++++ .../temporal/scheduling/state/WorkflowState.java | 10 ++++++++++ .../state/listener/WorkflowStateChangedListener.java | 3 ++- .../JobCreationAndStatusUpdateActivityTest.java | 2 ++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 71aef0ce15adc..66a754d64d1ca 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -5,6 +5,8 @@ package io.airbyte.workers.temporal.scheduling; import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StandardSyncSummary.ReplicationStatus; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; @@ -83,12 +85,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput( connectionUpdaterInput.getConnectionId())); + connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); return Optional.ofNullable(jobCreationOutput.getJobId()); }); maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput( jobId)); + connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); return attemptCreationOutput.getAttemptId(); })); @@ -121,6 +125,12 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr syncWorkflowInputs.getDestinationLauncherConfig(), syncWorkflowInputs.getSyncInput(), connectionId)); + + StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary(); + + if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { + workflowState.setFailed(true); + } } catch (final ChildWorkflowFailure childWorkflowFailure) { if (!(childWorkflowFailure.getCause() instanceof CanceledFailure)) { throw childWorkflowFailure; @@ -146,6 +156,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } else if (workflowState.isCancelled()) { jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput( maybeJobId.get())); + } else if (workflowState.isFailed()) { + reportFailure(connectionUpdaterInput); } else { // report success reportSuccess(connectionUpdaterInput); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index c5b0242c62372..971a2023c709a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -27,6 +27,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan private boolean skipScheduling = false; private boolean updated = false; private boolean cancelled = false; + private boolean failed = false; public void setRunning(final boolean running) { final ChangedStateEvent event = new ChangedStateEvent( @@ -68,12 +69,21 @@ public void setCancelled(final boolean cancelled) { this.cancelled = cancelled; } + public void setFailed(final boolean failed) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.FAILED, + failed); + stateChangedListener.addEvent(id, event); + this.failed = failed; + } + public void reset() { this.setRunning(false); this.setDeleted(false); this.setSkipScheduling(false); this.setUpdated(false); this.setCancelled(false); + this.setFailed(false); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 0e7301e975335..049f92ab4bb37 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -27,7 +27,8 @@ enum StateField { DELETED, RUNNING, SKIPPED_SCHEDULING, - UPDATED + UPDATED, + FAILED } @Value diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 43c37a82a85f5..f863d3298e14c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -16,6 +16,7 @@ import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; +import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; @@ -151,6 +152,7 @@ public void setJobSuccess() throws IOException { Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); Mockito.verify(mJobNotifier).successJob(Mockito.any()); + Mockito.verify(mJobtracker).trackSync(Mockito.any(), Mockito.eq(JobState.SUCCEEDED)); } @Test