Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Airbyte Protocol V1 support. #20036

Merged
merged 19 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ffe076c
Add Airbyte Protocol V1 support.
gosusnp Dec 2, 2022
0612904
Fix VersionedAirbyteStreamFactoryTest
gosusnp Dec 2, 2022
7bbb282
Remove AirbyteMessageMigrationV0 example
gosusnp Dec 2, 2022
7b4379e
Add Protocol Version constants
gosusnp Dec 2, 2022
b369e6e
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
edgao Dec 17, 2022
ab52888
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
edgao Jan 6, 2023
c1d7736
🎉Updated normalization to handle new datatypes (#19721)
etsybaev Jan 6, 2023
ee150e3
Update airbyte protocol migration (#20745)
gosusnp Jan 10, 2023
5e819a4
Data types update: Implement protocol message migrations (#19240)
edgao Jan 11, 2023
0d2a02e
Merge remote-tracking branch 'origin/master' into gosusnp/platform-us…
gosusnp Jan 18, 2023
76a7de3
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 18, 2023
c3193ae
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 25, 2023
85b342e
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 26, 2023
89e239d
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 26, 2023
8889370
On-the-fly migrations of persisted catalogs (#21757)
gosusnp Jan 27, 2023
cfa20f1
Update protocol support range (#21996)
gosusnp Jan 27, 2023
cc28116
bump normalization version to 0.3.0
edgao Jan 30, 2023
b367f96
Add version check on normalization (#22048)
gosusnp Jan 30, 2023
595481c
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:
description: "Enable or disable tmate session for debug during helm ac tests"
type: choice
default: 'false'
options:
options:
- 'true'
- 'false'
required: false
Expand Down Expand Up @@ -305,7 +305,7 @@ jobs:
if: always()
run: |
gcs_bucket_name="dev-ab-ci-run-results"
filename=$(echo "${{ fromJSON( steps.connectors-test-results.outputs.json ).check_url }}" | sed 's@.*/@@')
filename=$(echo "${{ fromJSON( steps.connectors-test-results.outputs.json ).check_url }}" | sed 's@.*/@@')
echo "$filename"
gsutil -h "Cache-Control:public" cp connectors_base_results.jsonl "gs://$gcs_bucket_name/oss/$filename.jsonl"

Expand Down Expand Up @@ -699,7 +699,7 @@ jobs:
if: always()
run: |
gcs_bucket_name="dev-ab-ci-run-results"
filename=$(echo "${{ fromJSON( steps.platform-results.outputs.json ).check_url }}" | sed 's@.*/@@')
filename=$(echo "${{ fromJSON( steps.platform-results.outputs.json ).check_url }}" | sed 's@.*/@@')
echo "$filename"
gsutil -h "Cache-Control:public" cp platform_results.jsonl "gs://$gcs_bucket_name/oss/$filename.jsonl"

Expand Down Expand Up @@ -756,7 +756,7 @@ jobs:
github-token: ${{ env.PAT }}
label: ${{ needs.start-platform-build-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-platform-build-runner.outputs.ec2-instance-id }}

## Kube Acceptance Tests
# Docker acceptance tests run as part of the build job.
# In case of self-hosted EC2 errors, remove this block.
Expand Down Expand Up @@ -919,7 +919,7 @@ jobs:
if: always()
run: |
gcs_bucket_name="dev-ab-ci-run-results"
filename=$(echo "${{ fromJSON( steps.kube-results.outputs.json ).check_url }}" | sed 's@.*/@@')
filename=$(echo "${{ fromJSON( steps.kube-results.outputs.json ).check_url }}" | sed 's@.*/@@')
echo "$filename"
gsutil -h "Cache-Control:public" cp kube_results.jsonl "gs://$gcs_bucket_name/oss/$filename.jsonl"

Expand Down Expand Up @@ -1036,7 +1036,7 @@ jobs:
- name: Fix EC-2 Runner
run: |
mkdir -p /actions-runner/_work/airbyte/airbyte && mkdir -p /actions-runner/_work/airbyte/airbyte/.kube

- name: Checkout Airbyte
uses: actions/checkout@v2
with:
Expand All @@ -1059,7 +1059,7 @@ jobs:

- uses: actions/setup-python@v4
with:
python-version: '3.9'
python-version: '3.9'

- uses: actions/setup-java@v1
with:
Expand Down Expand Up @@ -1087,11 +1087,11 @@ jobs:
sudo apt-get -y install tmate
attempt_limit: 3
attempt_delay: 2000 # in ms

- name: Start tmate session in background
if: inputs.debug_mode == 'true'
shell: bash
run: |
run: |
tmate -S /tmp/tmate.sock new-session -d # Launch tmate in a headless mode
tmate -S /tmp/tmate.sock wait tmate-ready # Blocks until the SSH connection is established
tmate -S /tmp/tmate.sock display -p '#{tmate_ssh}' # Prints the SSH connection string
Expand Down Expand Up @@ -1163,7 +1163,7 @@ jobs:
with:
name: Kubernetes Logs
path: /tmp/kubernetes_logs/*

- name: Upload test results to BuildPulse for flaky test detection
if: "!cancelled()" # Run this step even when the tests fail. Skip if the workflow is cancelled.
uses: Workshop64/buildpulse-action@main
Expand All @@ -1173,13 +1173,13 @@ jobs:
path: "/actions-runner/_work/airbyte/airbyte/*"
key: ${{ secrets.BUILDPULSE_ACCESS_KEY_ID }}
secret: ${{ secrets.BUILDPULSE_SECRET_ACCESS_KEY }}

- name: "Display logs of k3s"
if: failure()
shell: bash
run: |
journalctl -xeu k3s.service


# # In case of self-hosted EC2 errors, remove this block.
stop-helm-acceptance-test-runner:
Expand Down
1 change: 1 addition & 0 deletions airbyte-commons-protocol/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies {
testImplementation libs.bundles.micronaut.test

implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-json-validation')
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* AirbyteProtocol Message Migrator
Expand All @@ -25,104 +24,59 @@
@Singleton
public class AirbyteMessageMigrator {

private final List<AirbyteMessageMigration<?, ?>> migrationsToRegister;
private final SortedMap<String, AirbyteMessageMigration<?, ?>> migrations = new TreeMap<>();
private String mostRecentMajorVersion = "";
private final MigrationContainer<AirbyteMessageMigration<?, ?>> migrationContainer;

public AirbyteMessageMigrator(List<AirbyteMessageMigration<?, ?>> migrations) {
migrationsToRegister = migrations;
}

public AirbyteMessageMigrator() {
this(Collections.emptyList());
public AirbyteMessageMigrator(final List<AirbyteMessageMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
}

@PostConstruct
public void initialize() {
migrationsToRegister.forEach(this::registerMigration);
migrationContainer.initialize();
}

/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
if (target.getMajorVersion().equals(mostRecentMajorVersion)) {
return (PreviousVersion) message;
}

Object result = message;
Object[] selectedMigrations = selectMigrations(target).toArray();
for (int i = selectedMigrations.length; i > 0; --i) {
result = applyDowngrade((AirbyteMessageMigration<?, ?>) selectedMigrations[i - 1], result);
}
return (PreviousVersion) result;
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message,
final Version target,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.downgrade(message, target, (migration, msg) -> applyDowngrade(migration, msg, configuredAirbyteCatalog));
}

/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
if (source.getMajorVersion().equals(mostRecentMajorVersion)) {
return (CurrentVersion) message;
}

Object result = message;
for (var migration : selectMigrations(source)) {
result = applyUpgrade(migration, result);
}
return (CurrentVersion) result;
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message,
final Version source,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.upgrade(message, source, (migration, msg) -> applyUpgrade(migration, msg, configuredAirbyteCatalog));
}

public Version getMostRecentVersion() {
return new Version(mostRecentMajorVersion, "0", "0");
}

private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final Version version) {
final Collection<AirbyteMessageMigration<?, ?>> results = migrations.tailMap(version.getMajorVersion()).values();
if (results.isEmpty()) {
throw new RuntimeException("Unsupported migration version " + version.serialize());
}
return results;
return migrationContainer.getMostRecentVersion();
}

// Helper function to work around type casting
private <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.downgrade((CurrentVersion) message);
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.downgrade((CurrentVersion) message, configuredAirbyteCatalog);
}

// Helper function to work around type casting
private <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.upgrade((PreviousVersion) message);
}

/**
* Store migration in a sorted map key by the major of the lower version of the migration.
*
* The goal is to be able to retrieve the list of migrations to apply to get to/from a given
* version. We are only keying on the lower version because the right side (most recent version of
* the migration range) is always current version.
*/
@VisibleForTesting
void registerMigration(final AirbyteMessageMigration<?, ?> migration) {
final String key = migration.getPreviousVersion().getMajorVersion();
if (!migrations.containsKey(key)) {
migrations.put(key, migration);
if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) {
mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion();
}
} else {
throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName());
}
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.upgrade((PreviousVersion) message, configuredAirbyteCatalog);
}

// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrations.keySet();
return migrationContainer.getMigrationKeys();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package io.airbyte.commons.protocol;

import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Optional;

/**
* Wraps message migration from a fixed version to the most recent version
Expand All @@ -20,12 +22,12 @@ public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, fi
this.version = version;
}

public OriginalMessageType downgrade(final AirbyteMessage message) {
return migrator.downgrade(message, version);
public OriginalMessageType downgrade(final AirbyteMessage message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.downgrade(message, version, configuredAirbyteCatalog);
}

public AirbyteMessage upgrade(final OriginalMessageType message) {
return migrator.upgrade(message, version);
public AirbyteMessage upgrade(final OriginalMessageType message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.upgrade(message, version, configuredAirbyteCatalog);
}

public Version getVersion() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.version.Version;
import jakarta.inject.Singleton;

/**
* Factory to build AirbyteMessageVersionedMigrator
*/
@Singleton
public class AirbyteProtocolVersionedMigratorFactory {

private final AirbyteMessageMigrator airbyteMessageMigrator;
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;

public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airbyteMessageMigrator,
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator) {
this.airbyteMessageMigrator = airbyteMessageMigrator;
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
}

public <T> AirbyteMessageVersionedMigrator<T> getAirbyteMessageMigrator(final Version version) {
return new AirbyteMessageVersionedMigrator<>(airbyteMessageMigrator, version);
}

public final VersionedProtocolSerializer getProtocolSerializer(final Version version) {
return new VersionedProtocolSerializer(configuredAirbyteCatalogMigrator, version);
}

public Version getMostRecentVersion() {
return airbyteMessageMigrator.getMostRecentVersion();
}

}
Loading