diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 297744faaf138..44ac0c844b69d 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -332,6 +332,21 @@ public interface Configs { */ boolean getContainerOrchestratorEnabled(); + /** + * Get the longest duration of non long running activity + */ + int getMaxActivityTimeoutSecond(); + + /** + * Get the duration in second between 2 activity attempts + */ + int getDelayBetweenActivityAttemps(); + + /** + * Get number of attempts of the non long running activities + */ + int getActivityNumberOfAttempt(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 540bc7b05b74f..d33ea493f0e54 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -100,6 +100,10 @@ public class EnvConfigs implements Configs { public static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME"; public static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS"; + public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND"; + public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; + public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS"; + // defaults private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default"; @@ -567,6 +571,21 @@ public boolean getContainerOrchestratorEnabled() { return getEnvOrDefault(CONTAINER_ORCHESTRATOR_ENABLED, false, Boolean::valueOf); } + @Override + public int getMaxActivityTimeoutSecond() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); + } + + @Override + public int getDelayBetweenActivityAttemps() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); + } + + @Override + public int getActivityNumberOfAttempt() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "3")); + } + // Helpers public String getEnvOrDefault(final String key, final String defaultValue) { return getEnvOrDefault(key, defaultValue, Function.identity(), false); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 8e16f02539203..70d7a34f681e9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -7,6 +7,8 @@ import static java.util.stream.Collectors.toSet; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.activity.Activity; import io.temporal.api.common.v1.WorkflowExecution; @@ -58,6 +60,12 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build(); + private static final Configs configs = new EnvConfigs(); + public static final RetryOptions RETRY = RetryOptions.newBuilder() + .setMaximumAttempts(configs.getActivityNumberOfAttempt()) + .setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttemps())) + .build(); + public static final String DEFAULT_NAMESPACE = "default"; private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(7); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index e316419b08174..4dc9a6d887ffd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -62,12 +62,14 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow final Set failures = new HashSet<>(); Boolean partialSuccess = null; - private final GenerateInputActivity getSyncInputActivity = Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.OPTIONS); + private final GenerateInputActivity getSyncInputActivity = + Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final JobCreationAndStatusUpdateActivity jobCreationAndStatusUpdateActivity = - Workflow.newActivityStub(JobCreationAndStatusUpdateActivity.class, ActivityConfiguration.OPTIONS); - private final ConfigFetchActivity configFetchActivity = Workflow.newActivityStub(ConfigFetchActivity.class, ActivityConfiguration.OPTIONS); + Workflow.newActivityStub(JobCreationAndStatusUpdateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); + private final ConfigFetchActivity configFetchActivity = + Workflow.newActivityStub(ConfigFetchActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final ConnectionDeletionActivity connectionDeletionActivity = - Workflow.newActivityStub(ConnectionDeletionActivity.class, ActivityConfiguration.OPTIONS); + Workflow.newActivityStub(ConnectionDeletionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private CancellationScope syncWorkflowCancellationScope; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 169c327ed6d1b..59e585708287c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.shared; +import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityCancellationType; @@ -18,13 +19,10 @@ */ public class ActivityConfiguration { - private static final int MAX_SYNC_TIMEOUT_DAYS = new EnvConfigs().getSyncJobMaxTimeoutDays(); + private static final Configs configs = new EnvConfigs(); - public static final ActivityOptions OPTIONS = ActivityOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) - .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) - .build(); + private static final int MAX_SYNC_TIMEOUT_DAYS = configs.getSyncJobMaxTimeoutDays(); + private static final Duration DB_INTERACTION_TIMEOUT = Duration.ofSeconds(configs.getMaxActivityTimeoutSecond()); public static final ActivityOptions LONG_RUN_OPTIONS = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) @@ -35,4 +33,13 @@ public class ActivityConfiguration { .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); + public static final ActivityOptions SHORT_ACTIVITY_OPTIONS = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(DB_INTERACTION_TIMEOUT) + .setStartToCloseTimeout(DB_INTERACTION_TIMEOUT) + .setScheduleToStartTimeout(DB_INTERACTION_TIMEOUT) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .setRetryOptions(TemporalUtils.RETRY) + .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) + .build(); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 9a113993313ba..1af3f2880dc81 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -13,8 +13,6 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; import io.temporal.workflow.Workflow; import java.util.UUID; import org.slf4j.Logger; @@ -26,18 +24,13 @@ public class SyncWorkflowImpl implements SyncWorkflow { private static final String VERSION_LABEL = "sync-workflow"; private static final int CURRENT_VERSION = 1; - private static final ActivityOptions persistOptions = ActivityConfiguration.OPTIONS.toBuilder() - .setRetryOptions(RetryOptions.newBuilder() - .setMaximumAttempts(10) - .build()) - .build(); - private final ReplicationActivity replicationActivity = Workflow.newActivityStub(ReplicationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); private final NormalizationActivity normalizationActivity = Workflow.newActivityStub(NormalizationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); private final DbtTransformationActivity dbtTransformationActivity = Workflow.newActivityStub(DbtTransformationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS); - private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, persistOptions); + private final PersistStateActivity persistActivity = + Workflow.newActivityStub(PersistStateActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); @Override public StandardSyncOutput run(final JobRunConfig jobRunConfig,