Skip to content

Commit

Permalink
Revert "Remove cyclic dead code in ConfigPersistence" (#18939)
Browse files Browse the repository at this point in the history
* Revert "Remove unused interfaces (#18880)"

This reverts commit bf9cd2b.

* Compat fix for code that has been extracted

* Format
  • Loading branch information
gosusnp authored Nov 3, 2022
1 parent 0109f5d commit c7f9935
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.config.persistence;

import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
Expand All @@ -20,6 +21,11 @@ public interface ConfigPersistence {

<T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) throws JsonValidationException, IOException;

<T> ConfigWithMetadata<T> getConfigWithMetadata(AirbyteConfig configType, String configId, Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException;

<T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(AirbyteConfig configType, Class<T> clazz) throws JsonValidationException, IOException;

<T> void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException;

<T> void writeConfigs(AirbyteConfig configType, Map<String, T> configs) throws IOException, JsonValidationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,57 @@ private <T> void validate(final String configId, final List<ConfigWithMetadata<T
}
}

private <T> ConfigWithMetadata<T> validateAndReturn(final String configId,
final List<ConfigWithMetadata<T>> result,
final AirbyteConfig airbyteConfig)
throws ConfigNotFoundException {
validate(configId, result, airbyteConfig);
return result.get(0);
}

@Override
public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> clazz) throws JsonValidationException, IOException {
final List<T> config = new ArrayList<>();
listConfigsWithMetadata(configType, clazz).forEach(c -> config.add(c.getConfig()));
return config;
}

// listConfigWithMetadata seems to be unused at this point.
// It is only called by listConfigs and it only reads the config. The "metadata" part seems to be
// unused.
@Override
public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig configType, final String configId, final Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
final Optional<UUID> configIdOpt = Optional.of(UUID.fromString(configId));
if (configType == ConfigSchema.STANDARD_WORKSPACE) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardWorkspaceWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSourceDefinitionWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardDestinationDefinitionWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.SOURCE_CONNECTION) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listSourceConnectionWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.DESTINATION_CONNECTION) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listDestinationConnectionWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.SOURCE_OAUTH_PARAM) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listSourceOauthParamWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.DESTINATION_OAUTH_PARAM) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listDestinationOauthParamWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.STANDARD_SYNC_OPERATION) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSyncOperationWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.STANDARD_SYNC) {
throw buildUseStandardSyncPersistenceException();
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSyncStateWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogFetchEventWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listWorkspaceServiceAccountWithMetadata(configIdOpt), configType);
} else {
throw new IllegalArgumentException(UNKNOWN_CONFIG_TYPE + configType);
}
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig configType, final Class<T> clazz) throws IOException {
final List<ConfigWithMetadata<T>> configWithMetadata = new ArrayList<>();
if (configType == ConfigSchema.STANDARD_WORKSPACE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public StandardSyncPersistence(final Database database) {
}

public StandardSync getStandardSync(final UUID connectionId) throws IOException, ConfigNotFoundException {
return getStandardSyncWithMetadata(connectionId).getConfig();
}

public ConfigWithMetadata<StandardSync> getStandardSyncWithMetadata(final UUID connectionId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<StandardSync>> result = listStandardSyncWithMetadata(Optional.of(connectionId));

final boolean foundMoreThanOneConfig = result.size() > 1;
Expand All @@ -47,11 +51,15 @@ public StandardSync getStandardSync(final UUID connectionId) throws IOException,
} else if (foundMoreThanOneConfig) {
throw new IllegalStateException(String.format("Multiple %s configs found for ID %s: %s", ConfigSchema.STANDARD_SYNC, connectionId, result));
}
return result.get(0).getConfig();
return result.get(0);
}

public List<StandardSync> listStandardSync() throws IOException {
return listStandardSyncWithMetadata(Optional.empty()).stream().map(ConfigWithMetadata::getConfig).toList();
return listStandardSyncWithMetadata().stream().map(ConfigWithMetadata::getConfig).toList();
}

public List<ConfigWithMetadata<StandardSync>> listStandardSyncWithMetadata() throws IOException {
return listStandardSyncWithMetadata(Optional.empty());
}

public void writeStandardSync(final StandardSync standardSync) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -50,6 +51,24 @@ public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> cl
return configs;
}

@Override
public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig configType, final String configId, final Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
final ConfigWithMetadata<T> config = decoratedPersistence.getConfigWithMetadata(configType, configId, clazz);
validateJson(config.getConfig(), configType);
return config;
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig configType, final Class<T> clazz)
throws JsonValidationException, IOException {
final List<ConfigWithMetadata<T>> configs = decoratedPersistence.listConfigsWithMetadata(configType, clazz);
for (final ConfigWithMetadata<T> config : configs) {
validateJson(config.getConfig(), configType);
}
return configs;
}

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ void test() throws JsonValidationException, IOException, ConfigNotFoundException
}

private void checkSafeguards() {
final String anyString = "";
final String anyString = "00000000-0000-0000-0000-000000000000";

// Making sure that the objects that have been migrated out of config persistence are protected with
// an explicit error.
assertThrows(NotImplementedException.class, () -> configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, anyString, StandardSync.class));
assertThrows(NotImplementedException.class,
() -> configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, anyString, StandardSync.class));
assertThrows(NotImplementedException.class, () -> configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class));
assertThrows(NotImplementedException.class, () -> configPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardSync.class));
assertThrows(NotImplementedException.class,
() -> configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, anyString, MockData.standardSyncs().get(0)));
assertThrows(NotImplementedException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ void testWriteAndGetConfig() throws Exception {
.hasSameElementsAs(List.of(DESTINATION_SNOWFLAKE, DESTINATION_S3));
}

@Test
void testGetConfigWithMetadata() throws Exception {
final Instant now = Instant.now().minus(Duration.ofSeconds(1));
writeDestination(configPersistence, DESTINATION_S3);
final ConfigWithMetadata<StandardDestinationDefinition> configWithMetadata = configPersistence.getConfigWithMetadata(
STANDARD_DESTINATION_DEFINITION,
DESTINATION_S3.getDestinationDefinitionId().toString(),
StandardDestinationDefinition.class);
assertEquals("STANDARD_DESTINATION_DEFINITION", configWithMetadata.getConfigType());
assertTrue(configWithMetadata.getCreatedAt().isAfter(now));
assertTrue(configWithMetadata.getUpdatedAt().isAfter(now));
assertEquals(DESTINATION_S3.getDestinationDefinitionId().toString(), configWithMetadata.getConfigId());
assertEquals(DESTINATION_S3, configWithMetadata.getConfig());
}

@Test
void testListConfigWithMetadata() throws Exception {
final Instant now = Instant.now().minus(Duration.ofSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.Sets;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -47,14 +48,14 @@ class ValidatingConfigPersistenceTest {
private JsonSchemaValidator schemaValidator;

private ValidatingConfigPersistence configPersistence;
private DatabaseConfigPersistence decoratedConfigPersistence;
private ConfigPersistence decoratedConfigPersistence;
private static final String ERROR_MESSAGE = "error";

@BeforeEach
void setUp() {
schemaValidator = mock(JsonSchemaValidator.class);

decoratedConfigPersistence = mock(DatabaseConfigPersistence.class);
decoratedConfigPersistence = mock(ConfigPersistence.class);
configPersistence = new ValidatingConfigPersistence(decoratedConfigPersistence, schemaValidator);
}

Expand Down Expand Up @@ -143,4 +144,57 @@ void testListConfigsFailure() throws JsonValidationException, IOException {
.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class));
}

@Test
void testGetConfigWithMetadataSuccess() throws IOException, JsonValidationException, ConfigNotFoundException {
when(decoratedConfigPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class))
.thenReturn(withMetadata(SOURCE_1));
final ConfigWithMetadata<StandardSourceDefinition> actualConfig = configPersistence
.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class);

assertEquals(withMetadata(SOURCE_1), actualConfig);
}

@Test
void testGetConfigWithMetadataFailure() throws IOException, JsonValidationException, ConfigNotFoundException {
doThrow(new JsonValidationException(ERROR_MESSAGE)).when(schemaValidator).ensure(any(), any());
when(decoratedConfigPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class))
.thenReturn(withMetadata(SOURCE_1));

assertThrows(
JsonValidationException.class,
() -> configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class));
}

@Test
void testListConfigsWithMetadataSuccess() throws JsonValidationException, IOException {
when(decoratedConfigPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(List.of(withMetadata(SOURCE_1), withMetadata(SOURCE_2)));

final List<ConfigWithMetadata<StandardSourceDefinition>> actualConfigs = configPersistence
.listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);

// noinspection unchecked
assertEquals(
Sets.newHashSet(withMetadata(SOURCE_1), withMetadata(SOURCE_2)),
Sets.newHashSet(actualConfigs));
}

@Test
void testListConfigsWithMetadataFailure() throws JsonValidationException, IOException {
doThrow(new JsonValidationException(ERROR_MESSAGE)).when(schemaValidator).ensure(any(), any());
when(decoratedConfigPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(List.of(withMetadata(SOURCE_1), withMetadata(SOURCE_2)));

assertThrows(JsonValidationException.class, () -> configPersistence
.listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class));
}

private static ConfigWithMetadata<StandardSourceDefinition> withMetadata(final StandardSourceDefinition sourceDef) {
return new ConfigWithMetadata<>(sourceDef.getSourceDefinitionId().toString(),
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
INSTANT,
INSTANT,
sourceDef);
}

}

0 comments on commit c7f9935

Please sign in to comment.