Skip to content

Commit

Permalink
Expose cron scheduling in the Connections APIs (#15253)
Browse files Browse the repository at this point in the history
* Expose cron scheduling in the Connections APIs

* Update airbyte-api/src/main/openapi/config.yaml

Co-authored-by: terencecho <terence@airbyte.io>

* Update airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java

Co-authored-by: terencecho <terence@airbyte.io>

* update octavia-cli tests for new schedule schema, and fix update API impl

* check for null schedule data before updating

* handle new schedule related fields in generate / apply / import

* update octavia-cli changelog

* ensure that legacy manual schedule flag is consistent with schedule_type

* update octavia cli test coverage for new schedule schema

* fix failing octavia cli integration tests

* fix file diff check

* Update octavia-cli/unit_tests/test_apply/test_resources.py

Co-authored-by: Augustin <augustin.lafanechere@gmail.com>

Co-authored-by: terencecho <terence@airbyte.io>
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
  • Loading branch information
3 people authored Aug 11, 2022
1 parent f664bc9 commit 294ee8f
Show file tree
Hide file tree
Showing 26 changed files with 14,939 additions and 12,893 deletions.
74 changes: 74 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3218,6 +3218,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3257,6 +3261,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3298,6 +3306,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3335,6 +3347,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3386,6 +3402,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3416,6 +3436,10 @@ components:
$ref: "#/components/schemas/DestinationId"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
source:
Expand Down Expand Up @@ -3445,6 +3469,10 @@ components:
$ref: "#/components/schemas/DestinationId"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
source:
Expand All @@ -3467,6 +3495,8 @@ components:
- active
- inactive
- deprecated
# TODO(https://github.com/airbytehq/airbyte/issues/11432): remove.
# Prefer the ConnectionScheduleType and ConnectionScheduleData properties.
ConnectionSchedule:
description: if null, then no schedule is set.
type: object
Expand All @@ -3485,6 +3515,46 @@ components:
- days
- weeks
- months
ConnectionScheduleType:
description: determine how the schedule data should be interpreted
type: string
enum:
- manual
- basic
- cron
ConnectionScheduleData:
description: schedule for when the the connection should run, per the schedule type
type: object
properties:
# This should be populated when schedule type is basic.
basicSchedule:
type: object
required:
- timeUnit
- units
properties:
timeUnit:
type: string
enum:
- minutes
- hours
- days
- weeks
- months
units:
type: integer
format: int64
# This should be populated when schedule type is cron.
cron:
type: object
required:
- cronExpression
- cronTimeZone
properties:
cronExpression:
type: string
cronTimeZone:
type: string
NamespaceDefinitionType:
type: string
description: Method used for computing final namespace in destination
Expand Down Expand Up @@ -4564,6 +4634,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
operationIds:
Expand Down
1 change: 1 addition & 0 deletions airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation 'org.glassfish.jersey.inject:jersey-hk2'
implementation 'org.glassfish.jersey.media:jersey-media-json-jackson'
implementation 'org.glassfish.jersey.ext:jersey-bean-validation'
implementation 'org.quartz-scheduler:quartz:2.3.2'


testImplementation project(':airbyte-test-utils')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

package io.airbyte.server.converters;

import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleData;
import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.JobType;
Expand All @@ -17,7 +21,10 @@
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.server.handlers.helpers.ConnectionScheduleHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.util.stream.Collectors;

public class ApiPojoConverters {
Expand Down Expand Up @@ -78,7 +85,7 @@ public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.co
.memoryLimit(resourceReqs.getMemoryLimit());
}

public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) {
public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) throws JsonValidationException {

final StandardSync newConnection = new StandardSync()
.withNamespaceDefinition(Enums.convertTo(update.getNamespaceDefinition(), NamespaceDefinitionType.class))
Expand All @@ -99,7 +106,9 @@ public static io.airbyte.config.StandardSync connectionUpdateToInternal(final Co
}

// update sync schedule
if (update.getSchedule() != null) {
if (update.getScheduleType() != null) {
ConnectionScheduleHelper.populateSyncFromScheduleTypeAndData(newConnection, update.getScheduleType(), update.getScheduleData());
} else if (update.getSchedule() != null) {
final Schedule newSchedule = new Schedule()
.withTimeUnit(toPersistenceTimeUnit(update.getSchedule().getTimeUnit()))
.withUnits(update.getSchedule().getUnits());
Expand All @@ -112,21 +121,12 @@ public static io.airbyte.config.StandardSync connectionUpdateToInternal(final Co
}

public static ConnectionRead internalToConnectionRead(final StandardSync standardSync) {
ConnectionSchedule apiSchedule = null;

if (!standardSync.getManual()) {
apiSchedule = new ConnectionSchedule()
.timeUnit(toApiTimeUnit(standardSync.getSchedule().getTimeUnit()))
.units(standardSync.getSchedule().getUnits());
}

final ConnectionRead connectionRead = new ConnectionRead()
.connectionId(standardSync.getConnectionId())
.sourceId(standardSync.getSourceId())
.destinationId(standardSync.getDestinationId())
.operationIds(standardSync.getOperationIds())
.status(toApiStatus(standardSync.getStatus()))
.schedule(apiSchedule)
.name(standardSync.getName())
.namespaceDefinition(Enums.convertTo(standardSync.getNamespaceDefinition(), io.airbyte.api.model.generated.NamespaceDefinitionType.class))
.namespaceFormat(standardSync.getNamespaceFormat())
Expand All @@ -138,6 +138,8 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar
connectionRead.resourceRequirements(resourceRequirementsToApi(standardSync.getResourceRequirements()));
}

populateConnectionReadSchedule(standardSync, connectionRead);

return connectionRead;
}

Expand All @@ -149,10 +151,15 @@ public static io.airbyte.config.JobTypeResourceLimit.JobType toInternalJobType(f
return Enums.convertTo(jobType, io.airbyte.config.JobTypeResourceLimit.JobType.class);
}

// TODO(https://github.com/airbytehq/airbyte/issues/11432): remove these helpers.
public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final Schedule.TimeUnit apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, ConnectionSchedule.TimeUnitEnum.class);
}

public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final BasicSchedule.TimeUnit timeUnit) {
return Enums.convertTo(timeUnit, ConnectionSchedule.TimeUnitEnum.class);
}

public static ConnectionStatus toApiStatus(final StandardSync.Status status) {
return Enums.convertTo(status, ConnectionStatus.class);
}
Expand All @@ -169,4 +176,75 @@ public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionSch
return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class);
}

public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionScheduleDataBasicSchedule.TimeUnitEnum apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class);
}

public static ConnectionScheduleDataBasicSchedule.TimeUnitEnum toApiBasicScheduleTimeUnit(final BasicSchedule.TimeUnit timeUnit) {
return Enums.convertTo(timeUnit, ConnectionScheduleDataBasicSchedule.TimeUnitEnum.class);
}

public static ConnectionScheduleDataBasicSchedule.TimeUnitEnum toApiBasicScheduleTimeUnit(final Schedule.TimeUnit timeUnit) {
return Enums.convertTo(timeUnit, ConnectionScheduleDataBasicSchedule.TimeUnitEnum.class);
}

public static void populateConnectionReadSchedule(final StandardSync standardSync, final ConnectionRead connectionRead) {
// TODO(https://github.com/airbytehq/airbyte/issues/11432): only return new schema once frontend is
// ready.
if (standardSync.getScheduleType() != null) {
// Populate everything based on the new schema.
switch (standardSync.getScheduleType()) {
case MANUAL -> {
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.MANUAL);
}
case BASIC_SCHEDULE -> {
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.BASIC);
connectionRead.scheduleData(new ConnectionScheduleData()
.basicSchedule(new ConnectionScheduleDataBasicSchedule()
.timeUnit(toApiBasicScheduleTimeUnit(standardSync.getScheduleData().getBasicSchedule().getTimeUnit()))
.units(standardSync.getScheduleData().getBasicSchedule().getUnits())));
connectionRead.schedule(new ConnectionSchedule()
.timeUnit(toApiTimeUnit(standardSync.getScheduleData().getBasicSchedule().getTimeUnit()))
.units(standardSync.getScheduleData().getBasicSchedule().getUnits()));
}
case CRON -> {
// We don't populate any legacy data here.
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.CRON);
connectionRead.scheduleData(new ConnectionScheduleData()
.cron(new ConnectionScheduleDataCron()
.cronExpression(standardSync.getScheduleData().getCron().getCronExpression())
.cronTimeZone(standardSync.getScheduleData().getCron().getCronTimeZone())));
}
}
} else if (standardSync.getManual()) {
// Legacy schema, manual sync.
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.MANUAL);
} else {
// Legacy schema, basic schedule.
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.BASIC);
connectionRead.schedule(new ConnectionSchedule()
.timeUnit(toApiTimeUnit(standardSync.getSchedule().getTimeUnit()))
.units(standardSync.getSchedule().getUnits()));
connectionRead.scheduleData(new ConnectionScheduleData()
.basicSchedule(new ConnectionScheduleDataBasicSchedule()
.timeUnit(toApiBasicScheduleTimeUnit(standardSync.getSchedule().getTimeUnit()))
.units(standardSync.getSchedule().getUnits())));
}
}

public static ConnectionScheduleType toApiScheduleType(final ScheduleType scheduleType) {
switch (scheduleType) {
case MANUAL -> {
return ConnectionScheduleType.MANUAL;
}
case BASIC_SCHEDULE -> {
return ConnectionScheduleType.BASIC;
}
case CRON -> {
return ConnectionScheduleType.CRON;
}
}
throw new RuntimeException("Unexpected schedule type");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.airbyte.server.converters.CatalogDiffConverters;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.server.handlers.helpers.ConnectionMatcher;
import io.airbyte.server.handlers.helpers.ConnectionScheduleHelper;
import io.airbyte.server.handlers.helpers.DestinationMatcher;
import io.airbyte.server.handlers.helpers.SourceMatcher;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -140,6 +141,34 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
}

if (connectionCreate.getSchedule() != null && connectionCreate.getScheduleType() != null) {
throw new JsonValidationException("supply old or new schedule schema but not both");
}

if (connectionCreate.getScheduleType() != null) {
ConnectionScheduleHelper.populateSyncFromScheduleTypeAndData(standardSync, connectionCreate.getScheduleType(),
connectionCreate.getScheduleData());
} else {
populateSyncFromLegacySchedule(standardSync, connectionCreate);
}

configRepository.writeStandardSync(standardSync);

trackNewConnection(standardSync);

try {
LOGGER.info("Starting a connection manager workflow");
eventRunner.createConnectionManagerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the connection manager workflow failed", e);
configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId());
throw e;
}

return buildConnectionRead(connectionId);
}

private void populateSyncFromLegacySchedule(final StandardSync standardSync, final ConnectionCreate connectionCreate) {
if (connectionCreate.getSchedule() != null) {
final Schedule schedule = new Schedule()
.withTimeUnit(ApiPojoConverters.toPersistenceTimeUnit(connectionCreate.getSchedule().getTimeUnit()))
Expand All @@ -159,21 +188,6 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
standardSync.withManual(true);
standardSync.withScheduleType(ScheduleType.MANUAL);
}

configRepository.writeStandardSync(standardSync);

trackNewConnection(standardSync);

try {
LOGGER.info("Starting a connection manager workflow");
eventRunner.createConnectionManagerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the connection manager workflow failed", e);
configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId());
throw e;
}

return buildConnectionRead(connectionId);
}

private void trackNewConnection(final StandardSync standardSync) {
Expand Down
Loading

0 comments on commit 294ee8f

Please sign in to comment.