Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Airbyte Protocol V1 support. #20036

Merged
merged 19 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ffe076c
Add Airbyte Protocol V1 support.
gosusnp Dec 2, 2022
0612904
Fix VersionedAirbyteStreamFactoryTest
gosusnp Dec 2, 2022
7bbb282
Remove AirbyteMessageMigrationV0 example
gosusnp Dec 2, 2022
7b4379e
Add Protocol Version constants
gosusnp Dec 2, 2022
b369e6e
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
edgao Dec 17, 2022
ab52888
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
edgao Jan 6, 2023
c1d7736
🎉Updated normalization to handle new datatypes (#19721)
etsybaev Jan 6, 2023
ee150e3
Update airbyte protocol migration (#20745)
gosusnp Jan 10, 2023
5e819a4
Data types update: Implement protocol message migrations (#19240)
edgao Jan 11, 2023
0d2a02e
Merge remote-tracking branch 'origin/master' into gosusnp/platform-us…
gosusnp Jan 18, 2023
76a7de3
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 18, 2023
c3193ae
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 25, 2023
85b342e
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 26, 2023
89e239d
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 26, 2023
8889370
On-the-fly migrations of persisted catalogs (#21757)
gosusnp Jan 27, 2023
cfa20f1
Update protocol support range (#21996)
gosusnp Jan 27, 2023
cc28116
bump normalization version to 0.3.0
edgao Jan 30, 2023
b367f96
Add version check on normalization (#22048)
gosusnp Jan 30, 2023
595481c
Merge branch 'master' into gosusnp/platform-use-protocol-v1-the-quick…
gosusnp Jan 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.protocol;

import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage;

/**
* Wraps message migration from a fixed version to the most recent version
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.migrations;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import jakarta.inject.Singleton;

/**
* Placeholder AirbyteMessage Migration from v0 to v1
*/
@Singleton
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<AirbyteMessage, io.airbyte.protocol.models.AirbyteMessage> {

@Override
public AirbyteMessage downgrade(io.airbyte.protocol.models.AirbyteMessage message) {
return Jsons.object(Jsons.jsonNode(message), AirbyteMessage.class);
}

@Override
public io.airbyte.protocol.models.AirbyteMessage upgrade(AirbyteMessage message) {
return Jsons.object(Jsons.jsonNode(message), io.airbyte.protocol.models.AirbyteMessage.class);
}

@Override
public Version getPreviousVersion() {
return AirbyteProtocolVersion.V0;
}

@Override
public Version getCurrentVersion() {
return AirbyteProtocolVersion.V1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import lombok.Getter;

public class AirbyteMessageGenericDeserializer<T> implements AirbyteMessageDeserializer<T> {

@Getter
final AirbyteVersion targetVersion;
final Version targetVersion;
final Class<T> typeClass;

public AirbyteMessageGenericDeserializer(final AirbyteVersion targetVersion, final Class<T> typeClass) {
public AirbyteMessageGenericDeserializer(final Version targetVersion, final Class<T> typeClass) {
this.targetVersion = targetVersion;
this.typeClass = typeClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public class AirbyteMessageGenericSerializer<T> implements AirbyteMessageSerializer<T> {

@Getter
private final AirbyteVersion targetVersion;
private final Version targetVersion;

@Override
public String serialize(T message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
public class AirbyteMessageV0Deserializer extends AirbyteMessageGenericDeserializer<AirbyteMessage> {

public AirbyteMessageV0Deserializer() {
super(new AirbyteVersion("0.3.0"), AirbyteMessage.class);
super(AirbyteProtocolVersion.V0, AirbyteMessage.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
public class AirbyteMessageV0Serializer extends AirbyteMessageGenericSerializer<AirbyteMessage> {

public AirbyteMessageV0Serializer() {
super(new AirbyteVersion("0.3.0"));
super(AirbyteProtocolVersion.V0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
public class AirbyteMessageV1Deserializer extends AirbyteMessageGenericDeserializer<AirbyteMessage> {

public AirbyteMessageV1Deserializer() {
super(AirbyteProtocolVersion.V1, AirbyteMessage.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
public class AirbyteMessageV1Serializer extends AirbyteMessageGenericSerializer<AirbyteMessage> {

public AirbyteMessageV1Serializer() {
super(AirbyteProtocolVersion.V1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AirbyteMessageSerDeProviderMicronautTest {
@Test
void testSerDeInjection() {
// This should contain the list of all the supported majors of the airbyte protocol
final Set<String> expectedVersions = new HashSet<>(List.of("0"));
final Set<String> expectedVersions = new HashSet<>(List.of("0", "1"));

assertEquals(expectedVersions, serDeProvider.getDeserializerKeys());
assertEquals(expectedVersions, serDeProvider.getSerializerKeys());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.workers.internal;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigrator;
import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -26,15 +25,9 @@ public VersionedAirbyteMessageBufferedWriter(final BufferedWriter writer,

@Override
public void write(final AirbyteMessage message) throws IOException {
final T downgradedMessage = migrator.downgrade(convert(message));
final T downgradedMessage = migrator.downgrade(message);
writer.write(serializer.serialize(downgradedMessage));
writer.newLine();
}

// TODO remove this conversion once we migrated default AirbyteMessage to be from a versioned
// namespace
private io.airbyte.protocol.models.v0.AirbyteMessage convert(final AirbyteMessage message) {
return Jsons.object(Jsons.jsonNode(message), io.airbyte.protocol.models.v0.AirbyteMessage.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,12 @@ final protected void initializeForProtocolVersion(final Version protocolVersion)
@Override
protected Stream<AirbyteMessage> toAirbyteMessage(final JsonNode json) {
try {
final io.airbyte.protocol.models.v0.AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json));
return Stream.of(convert(message));
final AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json));
return Stream.of(message);
} catch (final RuntimeException e) {
logger.warn("Failed to upgrade a message from version {}: {}", protocolVersion, Jsons.serialize(json), e);
return Stream.empty();
}
}

// TODO remove this conversion once we migrated default AirbyteMessage to be from a versioned
// namespace
private AirbyteMessage convert(final io.airbyte.protocol.models.v0.AirbyteMessage message) {
return Jsons.object(Jsons.jsonNode(message), AirbyteMessage.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import io.airbyte.commons.protocol.AirbyteMessageMigrator;
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigrationV0;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigrationV1;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV1Deserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV1Serializer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.BufferedReader;
Expand All @@ -36,11 +38,11 @@ class VersionedAirbyteStreamFactoryTest {
@BeforeEach
void beforeEach() {
serDeProvider = spy(new AirbyteMessageSerDeProvider(
List.of(new AirbyteMessageV0Deserializer()),
List.of(new AirbyteMessageV0Serializer())));
List.of(new AirbyteMessageV0Deserializer(), new AirbyteMessageV1Deserializer()),
List.of(new AirbyteMessageV0Serializer(), new AirbyteMessageV1Serializer())));
serDeProvider.initialize();
final AirbyteMessageMigrator migrator = new AirbyteMessageMigrator(
List.of(new AirbyteMessageMigrationV0()));
List.of(new AirbyteMessageMigrationV1()));
migrator.initialize();
migratorFactory = spy(new AirbyteMessageVersionedMigratorFactory(migrator));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
public class AirbyteProtocolVersion {

public final static Version DEFAULT_AIRBYTE_PROTOCOL_VERSION = new Version("0.2.0");
public final static Version V0 = new Version("0.3.0");
public final static Version V1 = new Version("1.0.0");

public final static String AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME = "airbyte_protocol_version_max";
public final static String AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME = "airbyte_protocol_version_min";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public class EnvConfigs implements Configs {
// env variable names
public static final String AIRBYTE_ROLE = "AIRBYTE_ROLE";
public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String AIRBYTE_PROTOCOL_VERSION_MAX = "AIRBYTE_PROTOCOL_VERSION_MAX";
public static final String AIRBYTE_PROTOCOL_VERSION_MIN = "AIRBYTE_PROTOCOL_VERSION_MIN";
public static final String INTERNAL_API_HOST = "INTERNAL_API_HOST";
public static final String AIRBYTE_API_AUTH_HEADER_NAME = "AIRBYTE_API_AUTH_HEADER_NAME";
public static final String AIRBYTE_API_AUTH_HEADER_VALUE = "AIRBYTE_API_AUTH_HEADER_VALUE";
Expand Down Expand Up @@ -202,6 +200,8 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("1.0.0");
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MIN = new Version("0.0.0");
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
Expand Down Expand Up @@ -296,12 +296,12 @@ public AirbyteVersion getAirbyteVersion() {

@Override
public Version getAirbyteProtocolVersionMax() {
return new Version(getEnvOrDefault(AIRBYTE_PROTOCOL_VERSION_MAX, "0.3.0"));
return DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX;
}

@Override
public Version getAirbyteProtocolVersionMin() {
return new Version(getEnvOrDefault(AIRBYTE_PROTOCOL_VERSION_MIN, "0.0.0"));
return DEFAULT_AIRBYTE_PROTOCOL_VERSION_MIN;
}

@Override
Expand Down