From e9644bb8a653029755f02c6af861432d2308d357 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Mon, 14 Nov 2022 10:16:17 -0800 Subject: [PATCH] Check protocol version compatibility during a platform update (#19200) * Refactoring to improve code re-use * Add ProtocolVersionChecker * Add an option to configure if we are automatically upgrading connectors * Add airbyte version check to pass the fresh install case * Inject DefinitionsProvider in the BootloaderApp * Remove AutoUpgradeConnector config * Improve logging * Use named argument rather than positional * Make DefinitionsProvider optional * Format --- .../io/airbyte/bootloader/BootloaderApp.java | 90 ++++- .../bootloader/ProtocolVersionChecker.java | 222 +++++++++++ .../airbyte/bootloader/BootloaderAppTest.java | 15 +- .../ProtocolVersionCheckerTest.java | 347 ++++++++++++++++++ .../persistence/ActorDefinitionMigrator.java | 29 +- .../config/persistence/ConfigRepository.java | 5 + .../config/persistence/ConfigWriter.java | 43 +++ 7 files changed, 703 insertions(+), 48 deletions(-) create mode 100644 airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java create mode 100644 airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index 1132b5a46ef2b..de39f8a253396 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -8,8 +8,8 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.commons.version.Version; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; import io.airbyte.config.Geography; @@ -70,7 +70,7 @@ public class BootloaderApp { private final FeatureFlags featureFlags; private final SecretMigrator secretMigrator; private ConfigRepository configRepository; - private DefinitionsProvider localDefinitionsProvider; + private Optional definitionsProvider; private Database configDatabase; private Database jobDatabase; private JobPersistence jobPersistence; @@ -79,6 +79,13 @@ public class BootloaderApp { private final DSLContext configsDslContext; private final DSLContext jobsDslContext; + // This controls how we check the protocol version compatibility + // True means that the connectors will be forcefully upgraded regardless of whether they are used in + // an active sync or not. + // This should be moved to a Configs, however, this behavior is currently forced through hooks that + // are passed as the postLoadExecution. + private final boolean autoUpgradeConnectors; + /** * This method is exposed for Airbyte Cloud consumption. This lets us override the seed loading * logic and customise Cloud connector versions. Please check with the Platform team before making @@ -97,7 +104,9 @@ public BootloaderApp(final Configs configs, final DSLContext configsDslContext, final DSLContext jobsDslContext, final Flyway configsFlyway, - final Flyway jobsFlyway) { + final Flyway jobsFlyway, + final Optional definitionsProvider, + final boolean autoUpgradeConnectors) { this.configs = configs; this.postLoadExecution = postLoadExecution; this.featureFlags = featureFlags; @@ -106,11 +115,16 @@ public BootloaderApp(final Configs configs, this.configsFlyway = configsFlyway; this.jobsDslContext = jobsDslContext; this.jobsFlyway = jobsFlyway; + this.definitionsProvider = definitionsProvider; + this.autoUpgradeConnectors = autoUpgradeConnectors; initPersistences(configsDslContext, jobsDslContext); } + // Temporary duplication of constructor, to remove once Cloud has been migrated to the one above. + @Deprecated(forRemoval = true) public BootloaderApp(final Configs configs, + final Runnable postLoadExecution, final FeatureFlags featureFlags, final SecretMigrator secretMigrator, final DSLContext configsDslContext, @@ -118,18 +132,48 @@ public BootloaderApp(final Configs configs, final Flyway configsFlyway, final Flyway jobsFlyway) { this.configs = configs; + this.postLoadExecution = postLoadExecution; this.featureFlags = featureFlags; this.secretMigrator = secretMigrator; this.configsDslContext = configsDslContext; this.configsFlyway = configsFlyway; this.jobsDslContext = jobsDslContext; this.jobsFlyway = jobsFlyway; + this.autoUpgradeConnectors = false; + + try { + this.definitionsProvider = Optional.of(getLocalDefinitionsProvider()); + } catch (final IOException e) { + LOGGER.error("Unable to initialize persistence.", e); + } + + initPersistences(configsDslContext, jobsDslContext); + } + + public BootloaderApp(final Configs configs, + final FeatureFlags featureFlags, + final SecretMigrator secretMigrator, + final DSLContext configsDslContext, + final DSLContext jobsDslContext, + final Flyway configsFlyway, + final Flyway jobsFlyway, + final DefinitionsProvider definitionsProvider, + final boolean autoUpgradeConnectors) { + this.configs = configs; + this.featureFlags = featureFlags; + this.secretMigrator = secretMigrator; + this.configsDslContext = configsDslContext; + this.configsFlyway = configsFlyway; + this.jobsDslContext = jobsDslContext; + this.jobsFlyway = jobsFlyway; + this.definitionsProvider = Optional.of(definitionsProvider); + this.autoUpgradeConnectors = autoUpgradeConnectors; initPersistences(configsDslContext, jobsDslContext); postLoadExecution = () -> { try { - final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, localDefinitionsProvider); + final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get()); applyDefinitionsHelper.apply(); if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) { @@ -159,10 +203,9 @@ public void load() throws Exception { final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion(); assertNonBreakingMigration(jobPersistence, currAirbyteVersion); - final Version airbyteProtocolVersionMax = configs.getAirbyteProtocolVersionMax(); - final Version airbyteProtocolVersionMin = configs.getAirbyteProtocolVersionMin(); - // TODO ProtocolVersion validation should happen here - trackProtocolVersion(airbyteProtocolVersionMin, airbyteProtocolVersionMax); + final ProtocolVersionChecker protocolVersionChecker = + new ProtocolVersionChecker(jobPersistence, configs, configRepository, definitionsProvider); + assertNonBreakingProtocolVersionConstraints(protocolVersionChecker, jobPersistence, autoUpgradeConnectors); // TODO Will be converted to an injected singleton during DI migration final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); @@ -191,7 +234,7 @@ private static Database getConfigDatabase(final DSLContext dslContext) throws IO return new Database(dslContext); } - private static DefinitionsProvider getLocalDefinitionsProvider() throws IOException { + static DefinitionsProvider getLocalDefinitionsProvider() throws IOException { return new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); } @@ -207,7 +250,6 @@ private void initPersistences(final DSLContext configsDslContext, final DSLConte try { configDatabase = getConfigDatabase(configsDslContext); configRepository = new ConfigRepository(configDatabase); - localDefinitionsProvider = getLocalDefinitionsProvider(); jobDatabase = getJobDatabase(jobsDslContext); jobPersistence = getJobPersistence(jobDatabase); } catch (final IOException e) { @@ -249,7 +291,11 @@ public static void main(final String[] args) throws Exception { // Ensure that the database resources are closed on application shutdown CloseableShutdownHook.registerRuntimeShutdownHook(configsDataSource, jobsDataSource, configsDslContext, jobsDslContext); - final var bootloader = new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + final DefinitionsProvider definitionsProvider = getLocalDefinitionsProvider(); + + final var bootloader = + new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, definitionsProvider, + false); bootloader.load(); } } @@ -307,10 +353,24 @@ private static void assertNonBreakingMigration(final JobPersistence jobPersisten } } - private void trackProtocolVersion(final Version airbyteProtocolVersionMin, final Version airbyteProtocolVersionMax) throws IOException { - jobPersistence.setAirbyteProtocolVersionMin(airbyteProtocolVersionMin); - jobPersistence.setAirbyteProtocolVersionMax(airbyteProtocolVersionMax); - LOGGER.info("AirbyteProtocol version support range [{}:{}]", airbyteProtocolVersionMin.serialize(), airbyteProtocolVersionMax.serialize()); + private static void assertNonBreakingProtocolVersionConstraints(final ProtocolVersionChecker protocolVersionChecker, + final JobPersistence jobPersistence, + final boolean autoUpgradeConnectors) + throws Exception { + final Optional newProtocolRange = protocolVersionChecker.validate(autoUpgradeConnectors); + if (newProtocolRange.isEmpty()) { + throw new RuntimeException( + "Aborting bootloader to avoid breaking existing connection after an upgrade. " + + "Please address airbyte protocol version support issues in the connectors before retrying."); + } + trackProtocolVersion(jobPersistence, newProtocolRange.get()); + } + + private static void trackProtocolVersion(final JobPersistence jobPersistence, final AirbyteProtocolVersionRange protocolVersionRange) + throws IOException { + jobPersistence.setAirbyteProtocolVersionMin(protocolVersionRange.min()); + jobPersistence.setAirbyteProtocolVersionMax(protocolVersionRange.max()); + LOGGER.info("AirbyteProtocol version support range [{}:{}]", protocolVersionRange.min().serialize(), protocolVersionRange.max().serialize()); } static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) { diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java new file mode 100644 index 0000000000000..a1a3348454845 --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import io.airbyte.commons.version.AirbyteProtocolVersion; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; +import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.commons.version.Version; +import io.airbyte.config.ActorType; +import io.airbyte.config.Configs; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.init.DefinitionsProvider; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.JobPersistence; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ProtocolVersionChecker { + + private final JobPersistence jobPersistence; + private final Configs configs; + private final ConfigRepository configRepository; + private final Optional definitionsProvider; + + // Dependencies could be simplified once we break some pieces up: + // * JobPersistence for accessing the airbyte_metadata table. + // * Configs for getting the new Airbyte Protocol Range from the env vars. + // * ConfigRepository for accessing ActorDefinitions + public ProtocolVersionChecker(final JobPersistence jobPersistence, + final Configs configs, + final ConfigRepository configRepository, + final Optional definitionsProvider) { + this.jobPersistence = jobPersistence; + this.configs = configs; + this.configRepository = configRepository; + this.definitionsProvider = definitionsProvider; + } + + /** + * Validate the AirbyteProtocolVersion support range between the platform and the connectors. + *

+ * The goal is to make sure that we do not end up disabling existing connections after an upgrade + * that changes the protocol support range. + * + * @param supportAutoUpgrade whether the connectors will be automatically upgraded by the platform + * @return the supported protocol version range if check is successful, Optional.empty() if we would + * break existing connections. + * @throws IOException + */ + public Optional validate(final boolean supportAutoUpgrade) throws IOException { + final Optional currentAirbyteVersion = getCurrentAirbyteVersion(); + final AirbyteProtocolVersionRange currentRange = getCurrentProtocolVersionRange(); + final AirbyteProtocolVersionRange targetRange = getTargetProtocolVersionRange(); + + // Checking if there is a pre-existing version of airbyte. + // Without this check, the first run of the validation would fail because we do not have the tables + // set yet + // which means that the actor definitions lookup will throw SQLExceptions. + if (currentAirbyteVersion.isEmpty()) { + log.info("No previous version of Airbyte detected, assuming this is a fresh deploy."); + return Optional.of(targetRange); + } + + if (currentRange.equals(targetRange)) { + log.info("Using AirbyteProtocolVersion range [{}:{}]", targetRange.min().serialize(), targetRange.max().serialize()); + return Optional.of(targetRange); + } + + log.info("Detected an AirbyteProtocolVersion range change from [{}:{}] to [{}:{}]", + currentRange.min().serialize(), currentRange.max().serialize(), + targetRange.min().serialize(), targetRange.max().serialize()); + + final Map> conflicts = getConflictingActorDefinitions(targetRange); + + if (conflicts.isEmpty()) { + log.info("No protocol version conflict detected."); + return Optional.of(targetRange); + } + + final Set destConflicts = conflicts.getOrDefault(ActorType.DESTINATION, new HashSet<>()); + final Set sourceConflicts = conflicts.getOrDefault(ActorType.SOURCE, new HashSet<>()); + + if (!supportAutoUpgrade) { + // If we do not support auto upgrade, any conflict of used connectors must be resolved before being + // able to upgrade the platform. + log.warn("The following connectors need to be upgraded before being able to upgrade the platform"); + formatActorDefinitionForLogging(destConflicts, sourceConflicts).forEach(log::warn); + return Optional.empty(); + } + + final Set remainingDestConflicts = + projectRemainingConflictsAfterConnectorUpgrades(targetRange, destConflicts, ActorType.DESTINATION); + final Set remainingSourceConflicts = + projectRemainingConflictsAfterConnectorUpgrades(targetRange, sourceConflicts, ActorType.SOURCE); + + if (!remainingDestConflicts.isEmpty() || !remainingSourceConflicts.isEmpty()) { + // These set of connectors need a manual intervention because there is no compatible version listed + formatActorDefinitionForLogging(remainingDestConflicts, remainingSourceConflicts).forEach(log::warn); + return Optional.empty(); + } + + // These can be auto upgraded + destConflicts.removeAll(remainingDestConflicts); + sourceConflicts.removeAll(remainingSourceConflicts); + log.info("The following connectors will be upgraded"); + formatActorDefinitionForLogging(destConflicts, sourceConflicts).forEach(log::info); + return Optional.of(targetRange); + } + + protected Optional getCurrentAirbyteVersion() throws IOException { + return jobPersistence.getVersion().map(AirbyteVersion::new); + } + + protected AirbyteProtocolVersionRange getCurrentProtocolVersionRange() throws IOException { + Optional min = jobPersistence.getAirbyteProtocolVersionMin(); + Optional max = jobPersistence.getAirbyteProtocolVersionMax(); + + if (min.isPresent() != max.isPresent()) { + // Flagging this because this would be highly suspicious but not bad enough that we should fail + // hard. + // If the new config is fine, the system should self-heal. + log.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})", + min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse("")); + } + + return new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION), + max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)); + } + + protected AirbyteProtocolVersionRange getTargetProtocolVersionRange() { + return new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax()); + } + + protected Map> getConflictingActorDefinitions(final AirbyteProtocolVersionRange targetRange) throws IOException { + final Map> actorDefIdToProtocolVersion = configRepository.getActorDefinitionToProtocolVersionMap(); + final Map> conflicts = + actorDefIdToProtocolVersion.entrySet().stream() + // Keeping only ActorDefinitionIds that have an unsupported protocol version + .filter(e -> !targetRange.isSupported(e.getValue().getValue())) + // Build the ActorType -> List[ActorDefIds] map + .map(e -> Map.entry(e.getValue().getKey(), e.getKey())) + // Group by ActorType and transform the List> into a Set + .collect(Collectors.groupingBy(Entry::getKey, + Collectors.collectingAndThen(Collectors.toList(), list -> list.stream().map(Entry::getValue).collect(Collectors.toSet())))); + return conflicts; + } + + protected Set projectRemainingConflictsAfterConnectorUpgrades(final AirbyteProtocolVersionRange targetRange, + final Set initialConflicts, + final ActorType actorType) { + if (initialConflicts.isEmpty()) { + return Set.of(); + } + + final Set upgradedSourceDefs = getProtocolVersionsForActorDefinitions(actorType) + // Keep definition ids if the protocol version will fall into the new supported range + .filter(e -> initialConflicts.contains(e.getKey()) && targetRange.isSupported(e.getValue())) + .map(Entry::getKey) + .collect(Collectors.toSet()); + + // Get the set of source definitions that will still have conflict after the connector upgrades + final Set remainingConflicts = new HashSet<>(initialConflicts); + remainingConflicts.removeAll(upgradedSourceDefs); + return remainingConflicts; + } + + protected Stream> getProtocolVersionsForActorDefinitions(final ActorType actorType) { + if (definitionsProvider.isEmpty()) { + return Stream.empty(); + } + + Stream> stream; + if (actorType == ActorType.SOURCE) { + stream = definitionsProvider.get().getSourceDefinitions() + .stream() + .map(def -> Map.entry(def.getSourceDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); + } else { + stream = definitionsProvider.get().getDestinationDefinitions() + .stream() + .map(def -> Map.entry(def.getDestinationDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); + } + return stream; + } + + private Stream formatActorDefinitionForLogging(final Set remainingDestConflicts, final Set remainingSourceConflicts) { + return Stream.concat( + remainingSourceConflicts.stream().map(defId -> { + final StandardSourceDefinition sourceDef; + try { + sourceDef = configRepository.getStandardSourceDefinition(defId); + return String.format("Source: %s: %s: protocol version: %s", + sourceDef.getSourceDefinitionId(), sourceDef.getName(), sourceDef.getProtocolVersion()); + } catch (Exception e) { + log.info("Failed to getStandardSourceDefinition for {}", defId, e); + return String.format("Source: %s: Failed to fetch details...", defId); + } + }), + remainingDestConflicts.stream().map(defId -> { + try { + final StandardDestinationDefinition destDef = configRepository.getStandardDestinationDefinition(defId); + return String.format("Destination: %s: %s: protocol version: %s", + destDef.getDestinationDefinitionId(), destDef.getName(), destDef.getProtocolVersion()); + } catch (Exception e) { + log.info("Failed to getStandardDestinationDefinition for {}", defId, e); + return String.format("Source: %s: Failed to fetch details...", defId); + } + })); + } + +} diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index c85fb790813a4..a8dd4f653e8d8 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -132,7 +132,8 @@ void testBootloaderAppBlankDb() throws Exception { val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false); val bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, + BootloaderApp.getLocalDefinitionsProvider(), false); bootloader.load(); val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); @@ -200,7 +201,8 @@ void testBootloaderAppRunSecretMigration() throws Exception { environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); // Bootstrap the database for the test - val initBootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, null, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + val initBootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, null, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, + BootloaderApp.getLocalDefinitionsProvider(), false); initBootloader.load(); final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); @@ -241,7 +243,8 @@ void testBootloaderAppRunSecretMigration() throws Exception { // Perform secrets migration var bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, + BootloaderApp.getLocalDefinitionsProvider(), false); boolean isMigrated = jobsPersistence.isSecretMigrated(); assertFalse(isMigrated); @@ -261,7 +264,8 @@ void testBootloaderAppRunSecretMigration() throws Exception { reset(spiedSecretMigrator); // We need to re-create the bootloader because it is closing the persistence after running load bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, + BootloaderApp.getLocalDefinitionsProvider(), false); bootloader.load(); verifyNoInteractions(spiedSecretMigrator); @@ -270,7 +274,8 @@ void testBootloaderAppRunSecretMigration() throws Exception { when(mockedFeatureFlags.forceSecretMigration()).thenReturn(true); // We need to re-create the bootloader because it is closing the persistence after running load bootloader = - new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, + BootloaderApp.getLocalDefinitionsProvider(), false); bootloader.load(); verify(spiedSecretMigrator).migrateSecrets(); } diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java new file mode 100644 index 0000000000000..5b6a6c710bf15 --- /dev/null +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.airbyte.commons.version.AirbyteProtocolVersionRange; +import io.airbyte.commons.version.Version; +import io.airbyte.config.ActorType; +import io.airbyte.config.Configs; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.init.DefinitionsProvider; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.protocol.models.ConnectorSpecification; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class ProtocolVersionCheckerTest { + + Configs configs; + ConfigRepository configRepository; + DefinitionsProvider definitionsProvider; + JobPersistence jobPersistence; + ProtocolVersionChecker protocolVersionChecker; + + final Version V0_0_0 = new Version("0.0.0"); + final Version V1_0_0 = new Version("1.0.0"); + final Version V2_0_0 = new Version("2.0.0"); + + @BeforeEach + void beforeEach() throws IOException { + configs = mock(Configs.class); + configRepository = mock(ConfigRepository.class); + definitionsProvider = mock(DefinitionsProvider.class); + jobPersistence = mock(JobPersistence.class); + protocolVersionChecker = new ProtocolVersionChecker(jobPersistence, configs, configRepository, Optional.of(definitionsProvider)); + + when(jobPersistence.getVersion()).thenReturn(Optional.of("1.2.3")); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFirstInstallCheck(final boolean supportAutoUpgrade) throws IOException { + when(jobPersistence.getVersion()).thenReturn(Optional.empty()); + setTargetProtocolRangeRange(V0_0_0, V1_0_0); + + assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0)), protocolVersionChecker.validate(supportAutoUpgrade)); + } + + @Test + void testGetCurrentRange() throws IOException { + setCurrentProtocolRangeRange(V0_0_0, V1_0_0); + + assertEquals(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0), protocolVersionChecker.getCurrentProtocolVersionRange()); + } + + @Test + void testGetTargetRange() throws IOException { + setTargetProtocolRangeRange(V1_0_0, V2_0_0); + + assertEquals(new AirbyteProtocolVersionRange(V1_0_0, V2_0_0), protocolVersionChecker.getTargetProtocolVersionRange()); + } + + @Test + void testRetrievingCurrentConflicts() throws IOException { + final AirbyteProtocolVersionRange targetRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID source2 = UUID.randomUUID(); + final UUID source3 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + final UUID dest2 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V0_0_0), + source2, Map.entry(ActorType.SOURCE, V1_0_0), + source3, Map.entry(ActorType.SOURCE, V2_0_0), + dest1, Map.entry(ActorType.DESTINATION, V0_0_0), + dest2, Map.entry(ActorType.DESTINATION, V0_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + final Map> conflicts = protocolVersionChecker.getConflictingActorDefinitions(targetRange); + + final Map> expectedConflicts = Map.of( + ActorType.DESTINATION, Set.of(dest1, dest2), + ActorType.SOURCE, Set.of(source1)); + assertEquals(expectedConflicts, conflicts); + } + + @Test + void testRetrievingCurrentConflictsWhenNoConflicts() throws IOException { + final AirbyteProtocolVersionRange targetRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V2_0_0), + dest1, Map.entry(ActorType.DESTINATION, V1_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + final Map> conflicts = protocolVersionChecker.getConflictingActorDefinitions(targetRange); + + assertEquals(Map.of(), conflicts); + } + + @Test + void testProjectRemainingSourceConflicts() { + final AirbyteProtocolVersionRange targetRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); + + final UUID unrelatedSource = UUID.randomUUID(); + final UUID upgradedSource = UUID.randomUUID(); + final UUID notChangedSource = UUID.randomUUID(); + final UUID missingSource = UUID.randomUUID(); + final Set initialConflicts = Set.of(upgradedSource, notChangedSource, missingSource); + + setNewSourceDefinitions(List.of( + Map.entry(unrelatedSource, V2_0_0), + Map.entry(upgradedSource, V1_0_0), + Map.entry(notChangedSource, V0_0_0))); + + final Set actualConflicts = + protocolVersionChecker.projectRemainingConflictsAfterConnectorUpgrades(targetRange, initialConflicts, ActorType.SOURCE); + + final Set expectedConflicts = Set.of(notChangedSource, missingSource); + assertEquals(expectedConflicts, actualConflicts); + } + + @Test + void testProjectRemainingDestinationConflicts() { + final AirbyteProtocolVersionRange targetRange = new AirbyteProtocolVersionRange(V1_0_0, V2_0_0); + + final UUID dest1 = UUID.randomUUID(); + final UUID dest2 = UUID.randomUUID(); + final UUID dest3 = UUID.randomUUID(); + final Set initialConflicts = Set.of(dest1, dest2, dest3); + + setNewDestinationDefinitions(List.of( + Map.entry(dest1, V2_0_0), + Map.entry(dest2, V1_0_0), + Map.entry(dest3, V2_0_0))); + + final Set actualConflicts = + protocolVersionChecker.projectRemainingConflictsAfterConnectorUpgrades(targetRange, initialConflicts, ActorType.DESTINATION); + + final Set expectedConflicts = Set.of(); + assertEquals(expectedConflicts, actualConflicts); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testValidateSameRange(final boolean supportAutoUpgrade) throws Exception { + setCurrentProtocolRangeRange(V0_0_0, V2_0_0); + setTargetProtocolRangeRange(V0_0_0, V2_0_0); + + final Optional range = protocolVersionChecker.validate(supportAutoUpgrade); + assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V2_0_0)), range); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testValidateAllConnectorsAreUpgraded(final boolean supportAutoUpgrade) throws Exception { + setCurrentProtocolRangeRange(V0_0_0, V1_0_0); + setTargetProtocolRangeRange(V1_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID source2 = UUID.randomUUID(); + final UUID source3 = UUID.randomUUID(); + final UUID source4 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + final UUID dest2 = UUID.randomUUID(); + final UUID dest3 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V0_0_0), + source2, Map.entry(ActorType.SOURCE, V1_0_0), + source3, Map.entry(ActorType.SOURCE, V0_0_0), + source4, Map.entry(ActorType.SOURCE, V0_0_0), + dest1, Map.entry(ActorType.DESTINATION, V0_0_0), + dest2, Map.entry(ActorType.DESTINATION, V1_0_0), + dest3, Map.entry(ActorType.DESTINATION, V2_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + setNewSourceDefinitions(List.of( + Map.entry(source1, V1_0_0), + Map.entry(source2, V1_0_0), + Map.entry(source3, V2_0_0), + Map.entry(source4, V1_0_0))); + setNewDestinationDefinitions(List.of( + Map.entry(dest1, V1_0_0), + Map.entry(dest2, V1_0_0), + Map.entry(dest3, V2_0_0))); + + final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); + + // Without auto upgrade, we will fail the validation because it would require connector automatic + // actor definition + // upgrade for used sources/destinations. + if (supportAutoUpgrade) { + assertEquals(Optional.of(new AirbyteProtocolVersionRange(V1_0_0, V2_0_0)), actualRange); + } else { + assertEquals(Optional.empty(), actualRange); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testValidateBadUpgradeMissingSource(final boolean supportAutoUpgrade) throws Exception { + setCurrentProtocolRangeRange(V0_0_0, V1_0_0); + setTargetProtocolRangeRange(V1_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID source2 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + final UUID dest2 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V0_0_0), + source2, Map.entry(ActorType.SOURCE, V0_0_0), + dest1, Map.entry(ActorType.DESTINATION, V0_0_0), + dest2, Map.entry(ActorType.DESTINATION, V0_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + setNewSourceDefinitions(List.of( + Map.entry(source1, V0_0_0), + Map.entry(source2, V1_0_0))); + setNewDestinationDefinitions(List.of( + Map.entry(dest1, V1_0_0), + Map.entry(dest2, V1_0_0))); + + final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); + assertEquals(Optional.empty(), actualRange); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testValidateBadUpgradeMissingDestination(final boolean supportAutoUpgrade) throws Exception { + setCurrentProtocolRangeRange(V0_0_0, V1_0_0); + setTargetProtocolRangeRange(V1_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID source2 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + final UUID dest2 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V0_0_0), + source2, Map.entry(ActorType.SOURCE, V0_0_0), + dest1, Map.entry(ActorType.DESTINATION, V0_0_0), + dest2, Map.entry(ActorType.DESTINATION, V0_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + setNewSourceDefinitions(List.of( + Map.entry(source1, V1_0_0), + Map.entry(source2, V1_0_0))); + setNewDestinationDefinitions(List.of( + Map.entry(dest1, V1_0_0), + Map.entry(dest2, V0_0_0))); + + final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); + assertEquals(Optional.empty(), actualRange); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testValidateFailsOnProtocolRangeChangeWithoutDefinitionsProvider(final boolean supportAutoUpgrade) throws Exception { + protocolVersionChecker = new ProtocolVersionChecker(jobPersistence, configs, configRepository, Optional.empty()); + + setCurrentProtocolRangeRange(V0_0_0, V1_0_0); + setTargetProtocolRangeRange(V1_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V0_0_0), + dest1, Map.entry(ActorType.DESTINATION, V0_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); + assertEquals(Optional.empty(), actualRange); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testValidateSucceedsWhenNoProtocolRangeChangeWithoutDefinitionsProvider(final boolean supportAutoUpgrade) throws Exception { + protocolVersionChecker = new ProtocolVersionChecker(jobPersistence, configs, configRepository, Optional.empty()); + + setCurrentProtocolRangeRange(V0_0_0, V2_0_0); + setTargetProtocolRangeRange(V0_0_0, V2_0_0); + + final UUID source1 = UUID.randomUUID(); + final UUID dest1 = UUID.randomUUID(); + + final Map> initialActorDefinitions = Map.of( + source1, Map.entry(ActorType.SOURCE, V0_0_0), + dest1, Map.entry(ActorType.DESTINATION, V0_0_0)); + when(configRepository.getActorDefinitionToProtocolVersionMap()).thenReturn(initialActorDefinitions); + + final Optional actualRange = protocolVersionChecker.validate(supportAutoUpgrade); + assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V2_0_0)), actualRange); + } + + private void setCurrentProtocolRangeRange(final Version min, final Version max) throws IOException { + when(jobPersistence.getAirbyteProtocolVersionMin()).thenReturn(Optional.of(min)); + when(jobPersistence.getAirbyteProtocolVersionMax()).thenReturn(Optional.of(max)); + } + + private void setTargetProtocolRangeRange(final Version min, final Version max) throws IOException { + when(configs.getAirbyteProtocolVersionMin()).thenReturn(min); + when(configs.getAirbyteProtocolVersionMax()).thenReturn(max); + } + + private void setNewDestinationDefinitions(final List> defs) { + final List destDefinitions = defs.stream() + .map(e -> new StandardDestinationDefinition() + .withDestinationDefinitionId(e.getKey()) + .withSpec(new ConnectorSpecification().withProtocolVersion(e.getValue().serialize()))) + .toList(); + when(definitionsProvider.getDestinationDefinitions()).thenReturn(destDefinitions); + } + + private void setNewSourceDefinitions(final List> defs) { + final List sourceDefinitions = defs.stream() + .map(e -> new StandardSourceDefinition() + .withSourceDefinitionId(e.getKey()) + .withSpec(new ConnectorSpecification().withProtocolVersion(e.getValue().serialize()))) + .toList(); + when(definitionsProvider.getSourceDefinitions()).thenReturn(sourceDefinitions); + } + +} diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java index 7e53abb011629..51b37e9d77950 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java @@ -4,7 +4,6 @@ package io.airbyte.config.persistence; -import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; import static org.jooq.impl.DSL.asterisk; @@ -32,11 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.jooq.DSLContext; -import org.jooq.Record1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +71,7 @@ void updateConfigsFromSeed(final DSLContext ctx, throws IOException { LOGGER.info("Updating connector definitions from the seed if necessary..."); - final Set connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx); + final Set connectorRepositoriesInUse = ConfigWriter.getConnectorRepositoriesInUse(ctx); LOGGER.info("Connectors in use: {}", connectorRepositoriesInUse); final Map connectorRepositoryToInfoMap = getConnectorRepositoryToInfoMap(ctx); @@ -97,29 +93,6 @@ void updateConfigsFromSeed(final DSLContext ctx, LOGGER.info("Connector definitions have been updated ({} new connectors, and {} updates)", newConnectorCount, updatedConnectorCount); } - /** - * @return A set of connectors (both source and destination) that are already used in standard - * syncs. We identify connectors by its repository name instead of definition id because - * connectors can be added manually by users, and their config ids are not always the same - * as those in the seed. - */ - private Set getConnectorRepositoriesInUse(final DSLContext ctx) { - final Set usedConnectorDefinitionIds = ctx - .select(ACTOR.ACTOR_DEFINITION_ID) - .from(ACTOR) - .fetch() - .stream() - .flatMap(row -> Stream.of(row.value1())) - .collect(Collectors.toSet()); - - return ctx.select(ACTOR_DEFINITION.DOCKER_REPOSITORY) - .from(ACTOR_DEFINITION) - .where(ACTOR_DEFINITION.ID.in(usedConnectorDefinitionIds)) - .fetch().stream() - .map(Record1::value1) - .collect(Collectors.toSet()); - } - /** * @return A map about current connectors (both source and destination). It maps from connector * repository to its definition id and docker image tag. We identify a connector by its diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index d909cf47f232f..e8f20a29a4329 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -28,6 +28,7 @@ import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.commons.version.AirbyteProtocolVersionRange; +import io.airbyte.commons.version.Version; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.ConfigSchema; @@ -250,6 +251,10 @@ public List listStandardSourceDefinitions(final boolea return sourceDefinitions; } + public Map> getActorDefinitionToProtocolVersionMap() throws IOException { + return database.query(ConfigWriter::getActorDefinitionsInUseToProtocolVersion); + } + public List listPublicSourceDefinitions(final boolean includeTombstone) throws IOException { return listStandardActorDefinitions( ActorType.source, diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java index efcca3a997a22..fc20758a6cadb 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java @@ -4,8 +4,13 @@ package io.airbyte.config.persistence; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; + import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.AirbyteProtocolVersion; +import io.airbyte.commons.version.Version; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.db.instance.configs.jooq.generated.Tables; @@ -15,8 +20,15 @@ import java.time.LocalDate; import java.time.OffsetDateTime; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.jooq.DSLContext; import org.jooq.JSONB; +import org.jooq.Record4; import org.jooq.impl.DSL; /** @@ -29,6 +41,37 @@ @SuppressWarnings("PMD.CognitiveComplexity") public class ConfigWriter { + /** + * @return A set of connectors (both source and destination) that are already used in standard + * syncs. We identify connectors by its repository name instead of definition id because + * connectors can be added manually by users, and their config ids are not always the same + * as those in the seed. + */ + static Set getConnectorRepositoriesInUse(final DSLContext ctx) { + return getActorDefinitionsInUse(ctx) + .map(r -> r.get(ACTOR_DEFINITION.DOCKER_REPOSITORY)) + .collect(Collectors.toSet()); + } + + /** + * Get a map of connector to protocol version for all the connectors that are used in a standard + * syncs. + */ + static Map> getActorDefinitionsInUseToProtocolVersion(final DSLContext ctx) { + return getActorDefinitionsInUse(ctx) + .collect(Collectors.toMap(r -> r.get(ACTOR_DEFINITION.ID), + r -> Map.entry( + r.get(ACTOR_DEFINITION.ACTOR_TYPE) == ActorType.source ? io.airbyte.config.ActorType.SOURCE : io.airbyte.config.ActorType.DESTINATION, + AirbyteProtocolVersion.getWithDefault(r.get(ACTOR_DEFINITION.PROTOCOL_VERSION))))); + } + + private static Stream> getActorDefinitionsInUse(final DSLContext ctx) { + return ctx.select(ACTOR_DEFINITION.ID, ACTOR_DEFINITION.DOCKER_REPOSITORY, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.PROTOCOL_VERSION) + .from(ACTOR_DEFINITION) + .join(ACTOR).on(ACTOR.ACTOR_DEFINITION_ID.equal(ACTOR_DEFINITION.ID)) + .fetchStream(); + } + static void writeStandardSourceDefinition(final List configs, final DSLContext ctx) { final OffsetDateTime timestamp = OffsetDateTime.now(); configs.forEach((standardSourceDefinition) -> {