Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporal per stream resets #13990

Merged
merged 38 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fcef5bb
remove reset flags from workflow state + refactor
lmossman Jun 17, 2022
02a49ae
bring back cancelledForReset, since we need to distinguish between th…
lmossman Jun 17, 2022
be5e2df
delete reset job streams on cancel or success
lmossman Jun 17, 2022
44c9516
extract isResetJob to method
lmossman Jun 21, 2022
2ca63a8
merge with master
lmossman Jun 24, 2022
840be33
set sync modes on streams in reset job correctly
lmossman Jun 21, 2022
be5a3dd
format
lmossman Jun 21, 2022
e9fec17
Add test for getAllStreamsForConnection
alovew Jun 21, 2022
b4514d2
fix tests
lmossman Jun 21, 2022
77dc4f4
update more tests
lmossman Jun 22, 2022
4e671d2
add StreamResetActivityTests
alovew Jun 22, 2022
fe6268f
fix tests for default job creator
lmossman Jun 22, 2022
1c349b3
remove outdated comment
lmossman Jun 22, 2022
7344761
remove debug lines
lmossman Jun 22, 2022
343d339
remove unused enum value
lmossman Jun 22, 2022
98650f6
fix tests
alovew Jun 23, 2022
9656438
fix constant equals ordering
lmossman Jun 24, 2022
d182a07
make job mock not static
lmossman Jun 24, 2022
0a1d7fe
DRY and add comments
lmossman Jun 24, 2022
985efa8
add comment about deleted streams
lmossman Jun 24, 2022
eebcfc9
Remove io.airbyte.config.StreamDescriptor
alovew Jun 24, 2022
bde575c
regisster stream reset activity impl
lmossman Jun 24, 2022
45b6285
refetch connection workflow when checking job id, since it may have b…
lmossman Jun 24, 2022
1e4b69c
only cancel if workflow is running, to allow reset signal to always s…
lmossman Jun 24, 2022
0a617ae
fix reset signal to use new doneWaiting workflow state prop
lmossman Jun 24, 2022
85047f0
try to fix tests
lmossman Jun 25, 2022
703dce1
fix reset cancel case
lmossman Jun 28, 2022
a8a9d76
add acceptance test for resetting while sync is running
lmossman Jun 28, 2022
09b9621
format
lmossman Jun 28, 2022
8cc4fcd
fix new acceptance test
lmossman Jun 28, 2022
b1ee243
lower sleep on test
lmossman Jun 28, 2022
42a4ddc
raise sleep
lmossman Jun 28, 2022
e9d2ca3
increase sleep and timeout, and remove repeated test
lmossman Jun 28, 2022
573a0b4
merge with master
lmossman Jun 28, 2022
82a7c3b
use CatalogHelpers to extract stream descriptors
lmossman Jun 28, 2022
f58a3b6
raise sleep and timeout to prevent transient failures
lmossman Jun 28, 2022
2e4db98
merge with master again
lmossman Jun 28, 2022
a94977a
format
lmossman Jun 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ properties:
streamsToReset:
type: array
items:
"$ref": StreamDescriptor.yaml
type: object
existingJavaType: io.airbyte.protocol.models.StreamDescriptor

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -981,4 +983,10 @@ public void writeWorkspaceServiceAccountNoSecrets(final WorkspaceServiceAccount
workspaceServiceAccount);
}

public List<StreamDescriptor> getAllStreamsForConnection(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = getStandardSync(connectionId);
return CatalogHelpers.extractStreamDescriptors(standardSync.getCatalog());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import static org.jooq.impl.DSL.noCondition;

import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.StreamResetRecord;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.List;
Expand Down Expand Up @@ -38,7 +38,7 @@ public StreamResetPersistence(final Database database) {
}

/*
* Get a list of streamDescriptors for streams that have pending or running resets
* Get a list of StreamDescriptors for streams that have pending or running resets
*/
public List<StreamDescriptor> getStreamResets(final UUID connectionId) throws IOException {
return database.query(ctx -> ctx.select(DSL.asterisk())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
Expand Down Expand Up @@ -431,4 +435,31 @@ void testHealthCheckFailure() throws SQLException {
assertFalse(check);
}

@Test
void testGetAllStreamsForConnection() throws Exception {
final UUID connectionId = UUID.randomUUID();
final AirbyteStream airbyteStream = new AirbyteStream().withName("stream1").withNamespace("namespace1");
final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream().withStream(airbyteStream);
final AirbyteStream airbyteStream2 = new AirbyteStream().withName("stream2");
final ConfiguredAirbyteStream configuredStream2 = new ConfiguredAirbyteStream().withStream(airbyteStream2);
final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream, configuredStream2));

final StandardSync sync = new StandardSync()
.withCatalog(configuredCatalog);
doReturn(sync)
.when(configRepository)
.getStandardSync(connectionId);

final List<StreamDescriptor> result = configRepository.getAllStreamsForConnection(connectionId);
assertEquals(2, result.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't want to block this review for this but this and this is not applicable to this review because it is already using the jupiter assertions.
AssertJ is makes would make it easier to understand such a test and would give you a meaningful diff. It would look like:

Assertions.assertThat(result).hasSize(2)
// create the expectedStreamDescriptor here
Assertions.containsOnly(expectedStreamDescriptor, expectedStreamDescriptor2)


assertTrue(
result.stream().anyMatch(
streamDescriptor -> streamDescriptor.getName().equals("stream1") && streamDescriptor.getNamespace().equals("namespace1")));
assertTrue(
result.stream().anyMatch(
streamDescriptor -> streamDescriptor.getName().equals("stream2") && streamDescriptor.getNamespace() == null));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;

import io.airbyte.config.StreamDescriptor;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.test.utils.DatabaseConnectionHelper;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.scheduler.client;

import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.util.List;
import java.util.Set;
import java.util.UUID;

Expand All @@ -16,9 +18,9 @@ public interface EventRunner {

ManualOperationResult startNewCancellation(final UUID connectionId);

ManualOperationResult resetConnection(final UUID connectionId);
ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset);

ManualOperationResult synchronousResetConnection(final UUID connectionId);
ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset);

void deleteConnection(final UUID connectionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

package io.airbyte.scheduler.client;

import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand All @@ -27,12 +29,12 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
return temporalClient.startNewCancellation(connectionId);
}

public ManualOperationResult resetConnection(final UUID connectionId) {
return temporalClient.resetConnection(connectionId);
public ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
return temporalClient.resetConnection(connectionId, streamsToReset);
}

public ManualOperationResult synchronousResetConnection(final UUID connectionId) {
return temporalClient.synchronousResetConnection(connectionId);
public ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
return temporalClient.synchronousResetConnection(connectionId, streamsToReset);
}

public void deleteConnection(final UUID connectionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -87,13 +88,6 @@ public Optional<Long> createSyncJob(final SourceConnection source,
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}

// Strategy:
// 1. Set all streams to full refresh - overwrite.
// 2. Create a job where the source emits no records.
// 3. Run a sync from the empty source to the destination. This will overwrite all data for each
// stream in the destination.
// 4. The Empty source emits no state message, so state will start at null (i.e. start from the
// beginning on the next sync).
@Override
public Optional<Long> createResetConnectionJob(final DestinationConnection destination,
final StandardSync standardSync,
Expand All @@ -103,8 +97,19 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
throws IOException {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = standardSync.getCatalog();
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> {
final StreamDescriptor streamDescriptor = CatalogHelpers.extractDescriptor(configuredAirbyteStream);
configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
if (streamsToReset.contains(streamDescriptor)) {
// The Reset Source will emit no record messages for any streams, so setting the destination sync
// mode to OVERWRITE will empty out this stream in the destination.
// Note: streams in streamsToReset that are NOT in this configured catalog (i.e. deleted streams)
// will still have their state reset by the Reset Source, but will not be modified in the
// destination since they are not present in the catalog that is sent to the destination.
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
} else {
// Set streams that are not being reset to APPEND so that they are not modified in the destination
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
}
});
final JobResetConnectionConfig resetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(standardSync.getNamespaceDefinition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -41,8 +40,9 @@
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -52,8 +52,13 @@

public class DefaultJobCreatorTest {

private static final String STREAM_NAME = "users";
private static final String STREAM1_NAME = "stream1";
private static final String STREAM2_NAME = "stream2";
private static final String STREAM3_NAME = "stream3";
private static final String NAMESPACE = "namespace";
private static final String FIELD_NAME = "id";
private static final StreamDescriptor STREAM1_DESCRIPTOR = new StreamDescriptor().withName(STREAM1_NAME);
private static final StreamDescriptor STREAM2_DESCRIPTOR = new StreamDescriptor().withName(STREAM2_NAME).withNamespace(NAMESPACE);

private static final String SOURCE_IMAGE_NAME = "daxtarity/sourceimagename";
private static final String DESTINATION_IMAGE_NAME = "daxtarity/destinationimagename";
Expand All @@ -62,8 +67,6 @@ public class DefaultJobCreatorTest {
private static final StandardSync STANDARD_SYNC;
private static final StandardSyncOperation STANDARD_SYNC_OPERATION;
private static final long JOB_ID = 12L;
private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1");
private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2");

private JobPersistence jobPersistence;
private StatePersistence statePersistence;
Expand Down Expand Up @@ -97,13 +100,17 @@ public class DefaultJobCreatorTest {
.withConfiguration(implementationJson)
.withTombstone(false);

final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(stream));

final UUID connectionId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();

final ConfiguredAirbyteStream stream1 = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteStream stream3 = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream1, stream2, stream3));

STANDARD_SYNC = new StandardSync()
.withConnectionId(connectionId)
.withName("presto to hudi")
Expand Down Expand Up @@ -329,12 +336,21 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {

@Test
void testCreateResetConnectionJob() throws IOException {
final ConfiguredAirbyteCatalog expectedCatalog = STANDARD_SYNC.getCatalog();
expectedCatalog.getStreams()
.forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final List<StreamDescriptor> streamsToReset = List.of(STREAM1_DESCRIPTOR, STREAM2_DESCRIPTOR);
final ConfiguredAirbyteCatalog expectedCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
// this stream is not being reset, so it should have APPEND destination sync mode
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)));

final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId()))
Expand All @@ -349,7 +365,7 @@ void testCreateResetConnectionJob() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
Expand All @@ -364,7 +380,7 @@ void testCreateResetConnectionJob() throws IOException {
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));
streamsToReset);

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isPresent());
Expand All @@ -373,12 +389,21 @@ void testCreateResetConnectionJob() throws IOException {

@Test
void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
final ConfiguredAirbyteCatalog expectedCatalog = STANDARD_SYNC.getCatalog();
expectedCatalog.getStreams()
.forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final List<StreamDescriptor> streamsToReset = List.of(STREAM1_DESCRIPTOR, STREAM2_DESCRIPTOR);
final ConfiguredAirbyteCatalog expectedCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
// this stream is not being reset, so it should have APPEND destination sync mode
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)));

final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId()))
Expand All @@ -393,7 +418,7 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
Expand All @@ -408,7 +433,7 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));
streamsToReset);

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isEmpty());
Expand Down
Loading