Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/complete first sync #7832

Merged
merged 35 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4362719
Add a column in the workspace document
benmoriceau Nov 9, 2021
0d97aa4
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 9, 2021
036600c
Tmp
benmoriceau Nov 9, 2021
c0bfad6
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 10, 2021
961d7cf
Tmp
benmoriceau Nov 10, 2021
13719c5
Add a way to do a bulk write of the update of a config
benmoriceau Nov 11, 2021
1c64f6f
tmp
benmoriceau Nov 11, 2021
89a4c9d
tmp
benmoriceau Nov 11, 2021
c81fc22
Tmp
benmoriceau Nov 12, 2021
436698b
Add test
benmoriceau Nov 12, 2021
b3bbee1
Add Job submitter Test
benmoriceau Nov 12, 2021
e4911b7
format
benmoriceau Nov 12, 2021
cb135fc
Clean up
benmoriceau Nov 12, 2021
37d23e8
Format
benmoriceau Nov 12, 2021
1050a49
Add initialization
benmoriceau Nov 12, 2021
4bd8cd3
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 12, 2021
7b1c71c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 15, 2021
d6c8e6d
auto generated file
benmoriceau Nov 15, 2021
bd6e018
Tmp
benmoriceau Nov 15, 2021
e734032
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 15, 2021
f023440
Test
benmoriceau Nov 15, 2021
cd99107
remove all new fields values assignations
benmoriceau Nov 15, 2021
9683f8b
Fix errors
benmoriceau Nov 15, 2021
ddd3148
Remove unwanted change
benmoriceau Nov 15, 2021
a406e3f
Update and add to the new fields to the API
benmoriceau Nov 16, 2021
16e3658
format
benmoriceau Nov 16, 2021
66ad859
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 17, 2021
e04a3c1
PR comments
benmoriceau Nov 17, 2021
6764c31
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/com…
benmoriceau Nov 17, 2021
834edca
Another PR comment
benmoriceau Nov 17, 2021
ab01578
PR comments and test
benmoriceau Nov 17, 2021
478f466
Format
benmoriceau Nov 17, 2021
cfb08ff
Fix test
benmoriceau Nov 17, 2021
6f8d79f
PR comments
benmoriceau Nov 17, 2021
8c62f2e
format
benmoriceau Nov 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,10 @@ components:
type: array
items:
$ref: "#/components/schemas/Notification"
firstCompletedSync:
type: boolean
feedbackDone:
type: boolean
WorkspaceUpdate:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public void deleteConfig(final AirbyteConfig configType, final String configId) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ properties:
type: array
items:
"$ref": Notification.yaml
firstCompletedSync:
type: boolean
feedbackDone:
type: boolean
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface ConfigPersistence {

<T> void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException;

<T> void writeConfigs(AirbyteConfig configType, Map<String, T> configs) throws IOException, JsonValidationException;

void deleteConfig(AirbyteConfig configType, String configId) throws ConfigNotFoundException, IOException;

void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dryRun) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ public StandardSourceDefinition getSourceDefinitionFromConnection(final UUID con
}
}

public StandardWorkspace getStandardWorkspaceFromConnection(final UUID connectionId, final boolean isTombstone) {
try {
final StandardSync sync = getStandardSync(connectionId);
final SourceConnection source = getSourceConnection(sync.getSourceId());
return getStandardWorkspace(source.getWorkspaceId(), isTombstone);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

public List<StandardSourceDefinition> listStandardSourceDefinitions() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -146,18 +147,33 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws IOException {
final Map<String, T> configIdToConfig = new HashMap<>() {

{
put(configId, config);
}

};
writeConfigs(configType, configIdToConfig);
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException {
database.transaction(ctx -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)));
configs.forEach((configId, config) -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)));

final OffsetDateTime timestamp = OffsetDateTime.now();
final OffsetDateTime timestamp = OffsetDateTime.now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go outside of the loop so all of the inserted records have the same timestamp for the transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if (isExistingConfig) {
updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId);
} else {
insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configType.getIdFieldName());
}
if (isExistingConfig) {
updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId);
} else {
insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config),
configType.getIdFieldName());
}
});

return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
}
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException {
synchronized (lock) {
for (final Map.Entry<String, T> config : configs.entrySet()) {
writeConfigInternal(configType, config.getKey(), config.getValue());
}
}
}

private <T> void writeConfigs(final AirbyteConfig configType, final Stream<T> configs, final Path rootOverride) {
configs.forEach(config -> {
final String configId = configType.getId(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand Down Expand Up @@ -59,8 +60,25 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException {
validateJson(Jsons.jsonNode(config), configType);
decoratedPersistence.writeConfig(configType, configId, config);

final Map<String, T> configIdToConfig = new HashMap<>() {

{
put(configId, config);
}

};

writeConfigs(configType, configIdToConfig);
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs)
throws IOException, JsonValidationException {
for (final Map.Entry<String, T> config : configs.entrySet()) {
validateJson(Jsons.jsonNode(config.getValue()), configType);
}
decoratedPersistence.writeConfigs(configType, configs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.db.Database;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.Record1;
Expand Down Expand Up @@ -88,6 +90,13 @@ protected static void writeDestination(final ConfigPersistence configPersistence
configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
}

protected static void writeDestinations(final ConfigPersistence configPersistence, final List<StandardDestinationDefinition> destinations)
throws Exception {
final Map<String, StandardDestinationDefinition> destinationsByID = destinations.stream()
.collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity()));
configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID);
}

protected static void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
throws Exception {
configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -32,6 +34,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class ConfigRepositoryTest {

Expand All @@ -45,7 +49,7 @@ void setup() {
configPersistence = mock(ConfigPersistence.class);
final var secretPersistence = new MemorySecretPersistence();
configRepository =
new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence));
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence)));
}

@AfterEach
Expand Down Expand Up @@ -74,6 +78,35 @@ void assertReturnsWorkspace(final StandardWorkspace workspace) throws ConfigNotF
assertEquals(workspace, configRepository.getStandardWorkspace(WORKSPACE_ID, true));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWorkspaceByConnectionId(final boolean isTombstone) throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardWorkspace workspace = new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withTombstone(isTombstone);

final UUID connectionId = UUID.randomUUID();
final UUID sourceId = UUID.randomUUID();
final StandardSync mSync = new StandardSync()
.withSourceId(sourceId);
final SourceConnection mSourceConnection = new SourceConnection()
.withWorkspaceId(WORKSPACE_ID);
final StandardWorkspace mWorkflow = new StandardWorkspace()
.withWorkspaceId(WORKSPACE_ID);

doReturn(mSync)
.when(configRepository)
.getStandardSync(connectionId);
doReturn(mSourceConnection)
.when(configRepository)
.getSourceConnection(sourceId);
doReturn(mWorkflow)
.when(configRepository)
.getStandardWorkspace(WORKSPACE_ID, isTombstone);

configRepository.getStandardWorkspaceFromConnection(connectionId, isTombstone);

verify(configRepository).getStandardWorkspace(WORKSPACE_ID, isTombstone);
}

@Test
void testGetConnectionState() throws Exception {
final UUID connectionId = UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static org.mockito.Mockito.spy;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
Expand Down Expand Up @@ -53,6 +54,17 @@ void tearDown() throws Exception {
database.close();
}

@Test
public void testMultiWriteAndGetConfig() throws Exception {
writeDestinations(configPersistence, Lists.newArrayList(DESTINATION_S3, DESTINATION_SNOWFLAKE));
assertRecordCount(2);
assertHasDestination(DESTINATION_S3);
assertHasDestination(DESTINATION_SNOWFLAKE);
assertEquals(
List.of(DESTINATION_SNOWFLAKE, DESTINATION_S3),
configPersistence.listConfigs(STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class));
}

@Test
public void testWriteAndGetConfig() throws Exception {
writeDestination(configPersistence, DESTINATION_S3);
Expand All @@ -67,10 +79,10 @@ public void testWriteAndGetConfig() throws Exception {

@Test
public void testListConfigWithMetadata() throws Exception {
Instant now = Instant.now().minus(Duration.ofSeconds(1));
final Instant now = Instant.now().minus(Duration.ofSeconds(1));
writeDestination(configPersistence, DESTINATION_S3);
writeDestination(configPersistence, DESTINATION_SNOWFLAKE);
List<ConfigWithMetadata<StandardDestinationDefinition>> configWithMetadata = configPersistence
final List<ConfigWithMetadata<StandardDestinationDefinition>> configWithMetadata = configPersistence
.listConfigsWithMetadata(STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class);
assertEquals(2, configWithMetadata.size());
assertEquals("STANDARD_DESTINATION_DEFINITION", configWithMetadata.get(0).getConfigType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -58,6 +60,34 @@ void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNo
StandardSourceDefinition.class));
}

@Test
void testReadWriteConfigs() throws IOException, JsonValidationException, ConfigNotFoundException {
final Map<String, StandardSourceDefinition> sourceDefinitionById = new HashMap<>() {

{
put(UUID_1.toString(), SOURCE_1);
put(UUID_2.toString(), SOURCE_2);
}

};

configPersistence.writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionById);

assertEquals(
SOURCE_1,
configPersistence.getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_1.toString(),
StandardSourceDefinition.class));

assertEquals(
SOURCE_2,
configPersistence.getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_2.toString(),
StandardSourceDefinition.class));
}

@Test
void testListConfigs() throws JsonValidationException, IOException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1);
Expand Down
Loading