From e8f249ee8a52bc7d75573563e168b7e83f88630f Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 28 Jan 2022 16:36:07 -0800 Subject: [PATCH 1/2] add proper retry policy --- .../main/java/io/airbyte/config/Configs.java | 16 ++++++++++++++++ .../java/io/airbyte/config/EnvConfigs.java | 17 +++++++++++++++++ .../workers/temporal/TemporalUtils.java | 9 +++++++++ .../ConnectionManagerWorkflowImpl.java | 8 ++++---- .../shared/ActivityConfiguration.java | 18 ++++++++++++------ .../temporal/sync/SyncWorkflowImpl.java | 8 +------- 6 files changed, 59 insertions(+), 17 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 297744faaf138..55d2ffcce22a0 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -8,6 +8,7 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.storage.CloudStorageConfigs; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; @@ -332,6 +333,21 @@ public interface Configs { */ boolean getContainerOrchestratorEnabled(); + /** + * Get the longest duration of non long running activity + */ + int getMaxActivityTimeoutSecond(); + + /** + * Get the duration in second between 2 activity attempts + */ + int getDelayBetweenActivityAttemps(); + + /** + * Get number of attempts of the non long running activities + */ + int getActivityNumberOfAttempt(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 540bc7b05b74f..e9042aa53e517 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -100,6 +100,10 @@ public class EnvConfigs implements Configs { public static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME"; public static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS"; + public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND"; + public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; + public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS"; + // defaults private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default"; @@ -567,6 +571,19 @@ public boolean getContainerOrchestratorEnabled() { return getEnvOrDefault(CONTAINER_ORCHESTRATOR_ENABLED, false, Boolean::valueOf); } + @Override + public int getMaxActivityTimeoutSecond() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); + } + + @Override public int getDelayBetweenActivityAttemps() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); + } + + @Override public int getActivityNumberOfAttempt() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "3")); + } + // Helpers public String getEnvOrDefault(final String key, final String defaultValue) { return getEnvOrDefault(key, defaultValue, Function.identity(), false); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 8e16f02539203..9f8876250aac5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -7,6 +7,8 @@ import static java.util.stream.Collectors.toSet; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.activity.Activity; import io.temporal.api.common.v1.WorkflowExecution; @@ -58,6 +60,13 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build(); + private static final Configs configs = new EnvConfigs(); + public static final RetryOptions RETRY = RetryOptions.newBuilder() + .setMaximumAttempts(configs.getActivityNumberOfAttempt()) + .setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttemps())) + .build(); + + public static final String DEFAULT_NAMESPACE = "default"; private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(7); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index e316419b08174..e5ea934883714 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -62,12 +62,12 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow final Set failures = new HashSet<>(); Boolean partialSuccess = null; - private final GenerateInputActivity getSyncInputActivity = Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.OPTIONS); + private final GenerateInputActivity getSyncInputActivity = Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final JobCreationAndStatusUpdateActivity jobCreationAndStatusUpdateActivity = - Workflow.newActivityStub(JobCreationAndStatusUpdateActivity.class, ActivityConfiguration.OPTIONS); - private final ConfigFetchActivity configFetchActivity = Workflow.newActivityStub(ConfigFetchActivity.class, ActivityConfiguration.OPTIONS); + Workflow.newActivityStub(JobCreationAndStatusUpdateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); + private final ConfigFetchActivity configFetchActivity = Workflow.newActivityStub(ConfigFetchActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final ConnectionDeletionActivity connectionDeletionActivity = - Workflow.newActivityStub(ConnectionDeletionActivity.class, ActivityConfiguration.OPTIONS); + Workflow.newActivityStub(ConnectionDeletionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private CancellationScope syncWorkflowCancellationScope; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 169c327ed6d1b..b94fc8e4b3025 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -4,11 +4,13 @@ package io.airbyte.workers.temporal.scheduling.shared; +import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityOptions; import java.time.Duration; +import java.time.temporal.ChronoUnit; /** * Shared temporal workflow configuration in order to ensure that @@ -18,13 +20,10 @@ */ public class ActivityConfiguration { - private static final int MAX_SYNC_TIMEOUT_DAYS = new EnvConfigs().getSyncJobMaxTimeoutDays(); + private static final Configs configs = new EnvConfigs(); - public static final ActivityOptions OPTIONS = ActivityOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) - .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) - .build(); + private static final int MAX_SYNC_TIMEOUT_DAYS = configs.getSyncJobMaxTimeoutDays(); + private static final Duration DB_INTERACTION_TIMEOUT = Duration.ofSeconds(configs.getMaxActivityTimeoutSecond()); public static final ActivityOptions LONG_RUN_OPTIONS = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) @@ -35,4 +34,11 @@ public class ActivityConfiguration { .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); + public static final ActivityOptions SHORT_ACTIVITY_OPTIONS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(DB_INTERACTION_TIMEOUT) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .setRetryOptions(TemporalUtils.RETRY) + .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) + .build(); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 9a113993313ba..d7813ab2bed75 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -26,18 +26,12 @@ public class SyncWorkflowImpl implements SyncWorkflow { private static final String VERSION_LABEL = "sync-workflow"; private static final int CURRENT_VERSION = 1; - private static final ActivityOptions persistOptions = ActivityConfiguration.OPTIONS.toBuilder() - .setRetryOptions(RetryOptions.newBuilder() - .setMaximumAttempts(10) - .build()) - .build(); - private final ReplicationActivity replicationActivity = Workflow.newActivityStub(ReplicationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); private final NormalizationActivity normalizationActivity = Workflow.newActivityStub(NormalizationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); private final DbtTransformationActivity dbtTransformationActivity = Workflow.newActivityStub(DbtTransformationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); - private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, persistOptions); + private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); @Override public StandardSyncOutput run(final JobRunConfig jobRunConfig, From 5971ba7142f0a7fd23666828bc09ae095f7bade6 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 31 Jan 2022 09:22:20 -0800 Subject: [PATCH 2/2] format --- .../models/src/main/java/io/airbyte/config/Configs.java | 1 - .../models/src/main/java/io/airbyte/config/EnvConfigs.java | 6 ++++-- .../java/io/airbyte/workers/temporal/TemporalUtils.java | 1 - .../temporal/scheduling/ConnectionManagerWorkflowImpl.java | 6 ++++-- .../temporal/scheduling/shared/ActivityConfiguration.java | 3 ++- .../io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java | 5 ++--- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 55d2ffcce22a0..44ac0c844b69d 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -8,7 +8,6 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.storage.CloudStorageConfigs; import java.nio.file.Path; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index e9042aa53e517..d33ea493f0e54 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -576,11 +576,13 @@ public int getMaxActivityTimeoutSecond() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); } - @Override public int getDelayBetweenActivityAttemps() { + @Override + public int getDelayBetweenActivityAttemps() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); } - @Override public int getActivityNumberOfAttempt() { + @Override + public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "3")); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 9f8876250aac5..70d7a34f681e9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -66,7 +66,6 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo .setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttemps())) .build(); - public static final String DEFAULT_NAMESPACE = "default"; private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(7); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index e5ea934883714..4dc9a6d887ffd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -62,10 +62,12 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow final Set failures = new HashSet<>(); Boolean partialSuccess = null; - private final GenerateInputActivity getSyncInputActivity = Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); + private final GenerateInputActivity getSyncInputActivity = + Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final JobCreationAndStatusUpdateActivity jobCreationAndStatusUpdateActivity = Workflow.newActivityStub(JobCreationAndStatusUpdateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); - private final ConfigFetchActivity configFetchActivity = Workflow.newActivityStub(ConfigFetchActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); + private final ConfigFetchActivity configFetchActivity = + Workflow.newActivityStub(ConfigFetchActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final ConnectionDeletionActivity connectionDeletionActivity = Workflow.newActivityStub(ConnectionDeletionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index b94fc8e4b3025..59e585708287c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -10,7 +10,6 @@ import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityOptions; import java.time.Duration; -import java.time.temporal.ChronoUnit; /** * Shared temporal workflow configuration in order to ensure that @@ -35,7 +34,9 @@ public class ActivityConfiguration { .build(); public static final ActivityOptions SHORT_ACTIVITY_OPTIONS = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(DB_INTERACTION_TIMEOUT) .setStartToCloseTimeout(DB_INTERACTION_TIMEOUT) + .setScheduleToStartTimeout(DB_INTERACTION_TIMEOUT) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) .setRetryOptions(TemporalUtils.RETRY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index d7813ab2bed75..1af3f2880dc81 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -13,8 +13,6 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; import io.temporal.workflow.Workflow; import java.util.UUID; import org.slf4j.Logger; @@ -31,7 +29,8 @@ public class SyncWorkflowImpl implements SyncWorkflow { Workflow.newActivityStub(NormalizationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); private final DbtTransformationActivity dbtTransformationActivity = Workflow.newActivityStub(DbtTransformationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); - private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); + private final PersistStateActivity persistActivity = + Workflow.newActivityStub(PersistStateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); @Override public StandardSyncOutput run(final JobRunConfig jobRunConfig,