Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 677b477
Author: pmossman <parker@airbyte.io>
Date:   Fri Aug 5 15:24:10 2022 -0700

    format

commit 679b602
Author: pmossman <parker@airbyte.io>
Date:   Fri Aug 5 15:18:03 2022 -0700

    move converters to module that worker can access, convert statePersistence calls to API calls, convert statePersistence helper to local private method

commit 628931d
Author: pmossman <parker@airbyte.io>
Date:   Fri Aug 5 10:37:00 2022 -0700

    add createOrUpdateState API endpoint

commit bcf424d
Author: pmossman <parker@airbyte.io>
Date:   Fri Aug 5 13:42:16 2022 -0700

    add AirbyteApiClient to WorkerApp for data plane workers to use
  • Loading branch information
pmossman committed Aug 10, 2022
1 parent a1f30b6 commit c0b526c
Show file tree
Hide file tree
Showing 17 changed files with 575 additions and 114 deletions.
34 changes: 34 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,30 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/state/create_or_update:
post:
tags:
- connection
- internal
summary: Create or update the state for a connection. INTERNAL-ONLY.
operationId: createOrUpdateState
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionStateCreateOrUpdate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionState"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/search:
post:
tags:
Expand Down Expand Up @@ -3268,6 +3292,16 @@ components:
sourceCatalogId:
type: string
format: uuid
ConnectionStateCreateOrUpdate:
type: object
required:
- connectionId
- connectionState
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
connectionState:
$ref: "#/components/schemas/ConnectionState"
ConnectionUpdate:
type: object
required:
Expand Down
1 change: 1 addition & 0 deletions airbyte-config/config-models/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
dependencies {
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-api')
implementation project(':airbyte-commons')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ public interface Configs {
*/
int getAirbyteApiPort();

/**
* Define the header name used to authenticate with the Airbyte API
*
* @return
*/
String getAirbyteApiAuthHeaderName();

/**
* Define the header value used to authenticate with the Airbyte API
*
* @return
*/
String getAirbyteApiAuthHeaderValue();

/**
* Define the url the Airbyte Webapp is hosted at. Airbyte services use this information.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class EnvConfigs implements Configs {
public static final String AIRBYTE_ROLE = "AIRBYTE_ROLE";
public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String INTERNAL_API_HOST = "INTERNAL_API_HOST";
public static final String AIRBYTE_API_AUTH_HEADER_NAME = "AIRBYTE_API_AUTH_HEADER_NAME";
public static final String AIRBYTE_API_AUTH_HEADER_VALUE = "AIRBYTE_API_AUTH_HEADER_VALUE";
public static final String WORKER_ENVIRONMENT = "WORKER_ENVIRONMENT";
public static final String SPEC_CACHE_BUCKET = "SPEC_CACHE_BUCKET";
public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT";
Expand Down Expand Up @@ -474,6 +476,16 @@ public int getAirbyteApiPort() {
return Integer.parseInt(getEnsureEnv(INTERNAL_API_HOST).split(":")[1]);
}

@Override
public String getAirbyteApiAuthHeaderName() {
return getEnvOrDefault(AIRBYTE_API_AUTH_HEADER_NAME, "");
}

@Override
public String getAirbyteApiAuthHeaderValue() {
return getEnvOrDefault(AIRBYTE_API_AUTH_HEADER_VALUE, "");
}

@Override
public String getWebappUrl() {
return getEnsureEnv(WEBAPP_URL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.converters;
package io.airbyte.config.helpers;

import io.airbyte.api.model.generated.StreamDescriptor;

/**
* Utilities that convert protocol types into API representations of the protocol type.
* Utilities that convert protocol types into API or client representations of the protocol type.
*/
public class ProtocolConverters {

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

public static io.airbyte.api.client.model.generated.StreamDescriptor streamDescriptorToClient(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
return new io.airbyte.api.client.model.generated.StreamDescriptor()
.name(protocolStreamDescriptor.getName())
.namespace(protocolStreamDescriptor.getNamespace());
}

public static io.airbyte.protocol.models.StreamDescriptor streamDescriptorToProtocol(final StreamDescriptor apiStreamDescriptor) {
return new io.airbyte.protocol.models.StreamDescriptor().withName(apiStreamDescriptor.getName())
.withNamespace(apiStreamDescriptor.getNamespace());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.GlobalState;
import io.airbyte.api.model.generated.StreamState;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStreamState;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;

public class StateConverter {

/**
* Converts internal representation of state to API representation
*
* @param connectionId connection associated with the state
* @param stateWrapper internal state representation to convert
* @return api representation of state
*/
public static ConnectionState toApi(final UUID connectionId, final @Nullable StateWrapper stateWrapper) {
return new ConnectionState()
.connectionId(connectionId)
.stateType(convertStateTypeToApi(stateWrapper))
.state(stateWrapper != null ? stateWrapper.getLegacyState() : null)
.globalState(globalStateToApi(stateWrapper).orElse(null))
.streamState(streamStateToApi(stateWrapper).orElse(null));
}

/**
* Converts internal representation of state to client representation
*
* @param connectionId connection associated with the state
* @param stateWrapper internal state representation to convert
* @return client representation of state
*/
public static io.airbyte.api.client.model.generated.ConnectionState toClient(final UUID connectionId, final @Nullable StateWrapper stateWrapper) {
return new io.airbyte.api.client.model.generated.ConnectionState()
.connectionId(connectionId)
.stateType(convertStateTypeToClient(stateWrapper))
.state(stateWrapper != null ? stateWrapper.getLegacyState() : null)
.globalState(globalStateToClient(stateWrapper).orElse(null))
.streamState(streamStateToClient(stateWrapper).orElse(null));
}

/**
* Converts API representation of state to internal representation
*
* @param apiConnectionState api representation of state
* @return internal representation of state
*/
public static StateWrapper toInternal(final @Nullable ConnectionState apiConnectionState) {
return new StateWrapper()
.withStateType(convertStateTypeToInternal(apiConnectionState).orElse(null))
.withGlobal(globalStateToInternal(apiConnectionState).orElse(null))
.withLegacyState(apiConnectionState != null ? apiConnectionState.getState() : null)
.withStateMessages(streamStateToInternal(apiConnectionState).orElse(null));

}

/**
* Convert to API representation of state type. API has an additional type (NOT_SET). This
* represents the case where no state is saved so we do not know the state type.
*
* @param stateWrapper state to convert
* @return api representation of state type
*/
private static ConnectionStateType convertStateTypeToApi(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper == null || stateWrapper.getStateType() == null) {
return ConnectionStateType.NOT_SET;
} else {
return Enums.convertTo(stateWrapper.getStateType(), ConnectionStateType.class);
}
}

/**
* Convert to client representation of state type. The client model has an additional type
* (NOT_SET). This represents the case where no state is saved so we do not know the state type.
*
* @param stateWrapper state to convert
* @return client representation of state type
*/
private static io.airbyte.api.client.model.generated.ConnectionStateType convertStateTypeToClient(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper == null || stateWrapper.getStateType() == null) {
return io.airbyte.api.client.model.generated.ConnectionStateType.NOT_SET;
} else {
return Enums.convertTo(stateWrapper.getStateType(), io.airbyte.api.client.model.generated.ConnectionStateType.class);
}
}

/**
* Convert to internal representation of state type, if set. Otherise, empty optional
*
* @param connectionState API state to convert.
* @return internal state type, if set. Otherwise, empty optional.
*/
private static Optional<StateType> convertStateTypeToInternal(final @Nullable ConnectionState connectionState) {
if (connectionState == null || connectionState.getStateType().equals(ConnectionStateType.NOT_SET)) {
return Optional.empty();
} else {
return Optional.of(Enums.convertTo(connectionState.getStateType(), StateType.class));
}
}

/**
* If wrapper is of type global state, returns API representation of global state. Otherwise, empty
* optional.
*
* @param stateWrapper state wrapper to extract from
* @return api representation of global state if state wrapper is type global. Otherwise, empty
* optional.
*/
private static Optional<GlobalState> globalStateToApi(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null
&& stateWrapper.getStateType() == StateType.GLOBAL
&& stateWrapper.getGlobal() != null
&& stateWrapper.getGlobal().getGlobal() != null) {
return Optional.of(new GlobalState()
.sharedState(stateWrapper.getGlobal().getGlobal().getSharedState())
.streamStates(stateWrapper.getGlobal().getGlobal().getStreamStates()
.stream()
.map(StateConverter::streamStateStructToApi)
.toList()));
} else {
return Optional.empty();
}
}

/**
* If wrapper is of type global state, returns client representation of global state. Otherwise,
* empty optional.
*
* @param stateWrapper state wrapper to extract from
* @return client representation of global state if state wrapper is type global. Otherwise, empty
* optional.
*/
private static Optional<io.airbyte.api.client.model.generated.GlobalState> globalStateToClient(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null
&& stateWrapper.getStateType() == StateType.GLOBAL
&& stateWrapper.getGlobal() != null
&& stateWrapper.getGlobal().getGlobal() != null) {
return Optional.of(new io.airbyte.api.client.model.generated.GlobalState()
.sharedState(stateWrapper.getGlobal().getGlobal().getSharedState())
.streamStates(stateWrapper.getGlobal().getGlobal().getStreamStates()
.stream()
.map(StateConverter::streamStateStructToClient)
.toList()));
} else {
return Optional.empty();
}
}

/**
* If API state is of type global, returns internal representation of global state. Otherwise, empty
* optional.
*
* @param connectionState API state representation to extract from
* @return global state message if API state is of type global. Otherwise, empty optional.
*/
private static Optional<AirbyteStateMessage> globalStateToInternal(final @Nullable ConnectionState connectionState) {
if (connectionState != null
&& connectionState.getStateType() == ConnectionStateType.GLOBAL
&& connectionState.getGlobalState() != null) {
return Optional.of(new AirbyteStateMessage()
.withGlobal(new AirbyteGlobalState()
.withSharedState(connectionState.getGlobalState().getSharedState())
.withStreamStates(connectionState.getGlobalState().getStreamStates()
.stream()
.map(StateConverter::streamStateStructToInternal)
.toList())));
} else {
return Optional.empty();
}
}

/**
* If wrapper is of type stream state, returns API representation of stream state. Otherwise, empty
* optional.
*
* @param stateWrapper state wrapper to extract from
* @return api representation of stream state if state wrapper is type stream. Otherwise, empty
* optional.
*/
private static Optional<List<StreamState>> streamStateToApi(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null && stateWrapper.getStateType() == StateType.STREAM && stateWrapper.getStateMessages() != null) {
return Optional.ofNullable(stateWrapper.getStateMessages()
.stream()
.map(AirbyteStateMessage::getStream)
.map(StateConverter::streamStateStructToApi)
.toList());
} else {
return Optional.empty();
}
}

/**
* If wrapper is of type stream state, returns client representation of stream state. Otherwise,
* empty optional.
*
* @param stateWrapper state wrapper to extract from
* @return client representation of stream state if state wrapper is type stream. Otherwise, empty
* optional.
*/
private static Optional<List<io.airbyte.api.client.model.generated.StreamState>> streamStateToClient(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null && stateWrapper.getStateType() == StateType.STREAM && stateWrapper.getStateMessages() != null) {
return Optional.ofNullable(stateWrapper.getStateMessages()
.stream()
.map(AirbyteStateMessage::getStream)
.map(StateConverter::streamStateStructToClient)
.toList());
} else {
return Optional.empty();
}
}

/**
* If API state is of type stream, returns internal representation of stream state. Otherwise, empty
* optional.
*
* @param connectionState API representation of state to extract from
* @return internal representation of stream state if API state representation is of type stream.
* Otherwise, empty optional.
*/
private static Optional<List<AirbyteStateMessage>> streamStateToInternal(final @Nullable ConnectionState connectionState) {
if (connectionState != null && connectionState.getStateType() == ConnectionStateType.STREAM && connectionState.getStreamState() != null) {
return Optional.ofNullable(connectionState.getStreamState()
.stream()
.map(StateConverter::streamStateStructToInternal)
.map(s -> new AirbyteStateMessage().withStream(s))
.toList());
} else {
return Optional.empty();
}
}

private static StreamState streamStateStructToApi(final AirbyteStreamState streamState) {
return new StreamState()
.streamDescriptor(ProtocolConverters.streamDescriptorToApi(streamState.getStreamDescriptor()))
.streamState(streamState.getStreamState());
}

private static io.airbyte.api.client.model.generated.StreamState streamStateStructToClient(final AirbyteStreamState streamState) {
return new io.airbyte.api.client.model.generated.StreamState()
.streamDescriptor(ProtocolConverters.streamDescriptorToClient(streamState.getStreamDescriptor()))
.streamState(streamState.getStreamState());
}

private static AirbyteStreamState streamStateStructToInternal(final StreamState streamState) {
return new AirbyteStreamState()
.withStreamDescriptor(ProtocolConverters.streamDescriptorToProtocol(streamState.getStreamDescriptor()))
.withStreamState(streamState.getStreamState());
}

}
Loading

0 comments on commit c0b526c

Please sign in to comment.