Skip to content

Commit 2ac3274

Browse files
authored
Heartbeat for long running activity (#9852)
* Heartbeat for long running activity * PR comments
1 parent 2a015a8 commit 2ac3274

File tree

4 files changed

+13
-11
lines changed

4 files changed

+13
-11
lines changed

airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java

-6
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,6 @@ public void load() throws Exception {
123123

124124
jobPersistence.setVersion(currAirbyteVersion.serialize());
125125
LOGGER.info("Set version to {}", currAirbyteVersion);
126-
127-
if (featureFlags.usesNewScheduler()) {
128-
LOGGER.info("Start cleaning zombie jobs");
129-
cleanupZombies(jobPersistence);
130-
LOGGER.info("Cleaning zombie jobs done");
131-
}
132126
}
133127

134128
if (postLoadExecution != null) {

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java

+9
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,13 @@ public class ActivityConfiguration {
2626
.setRetryOptions(TemporalUtils.NO_RETRY)
2727
.build();
2828

29+
public static final ActivityOptions LONG_RUN_OPTIONS = ActivityOptions.newBuilder()
30+
.setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS))
31+
.setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS))
32+
.setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS))
33+
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
34+
.setRetryOptions(TemporalUtils.NO_RETRY)
35+
.setHeartbeatTimeout(Duration.ofSeconds(30))
36+
.build();
37+
2938
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class SyncWorkflowImpl implements SyncWorkflow {
3232
.build())
3333
.build();
3434

35-
private final ReplicationActivity replicationActivity = Workflow.newActivityStub(ReplicationActivity.class, ActivityConfiguration.OPTIONS);
36-
private final NormalizationActivity normalizationActivity = Workflow.newActivityStub(NormalizationActivity.class, ActivityConfiguration.OPTIONS);
35+
private final ReplicationActivity replicationActivity = Workflow.newActivityStub(ReplicationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS);
36+
private final NormalizationActivity normalizationActivity =
37+
Workflow.newActivityStub(NormalizationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS);
3738
private final DbtTransformationActivity dbtTransformationActivity =
38-
Workflow.newActivityStub(DbtTransformationActivity.class, ActivityConfiguration.OPTIONS);
39+
Workflow.newActivityStub(DbtTransformationActivity.class, ActivityConfiguration.LONG_RUN_OPTIONS);
3940
private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, persistOptions);
4041

4142
@Override

docker-compose.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ services:
3131
- DATABASE_URL=${DATABASE_URL}
3232
- DATABASE_USER=${DATABASE_USER}
3333
- LOG_LEVEL=${LOG_LEVEL}
34-
- NEW_SCHEDULER=${NEW_SCHEDULER}
3534
- RUN_DATABASE_MIGRATION_ON_STARTUP=${RUN_DATABASE_MIGRATION_ON_STARTUP}
36-
- TEMPORAL_HOST=${TEMPORAL_HOST}
3735
db:
3836
image: airbyte/db:${VERSION}
3937
logging: *default-logging

0 commit comments

Comments
 (0)