Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement migration to create workspace_service_account table #11943

Merged
merged 4 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.59.004", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.65.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public enum ConfigSchema implements AirbyteConfig {
standardWorkspace -> standardWorkspace.getWorkspaceId().toString(),
"workspaceId"),

WORKSPACE_SERVICE_ACCOUNT("WorkspaceServiceAccount.yaml",
WorkspaceServiceAccount.class,
workspaceServiceAccount -> workspaceServiceAccount.getWorkspaceId().toString(),
"workspaceId"),

// source
STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml",
StandardSourceDefinition.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/WorkspaceServiceAccount.yaml
title: WorkspaceServiceAccount
description: service account attached to a workspace
type: object
required:
- workspaceId
- serviceAccountId
- serviceAccountEmail
- jsonCredential
- hmacKey
additionalProperties: false
properties:
workspaceId:
type: string
format: uuid
serviceAccountId:
type: string
serviceAccountEmail:
type: string
jsonCredential:
# Ref : io.airbyte.config.persistence.MockData#workspaceServiceAccounts() for sample data
description: Represents the JSON key generated for the service account
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
hmacKey:
# Ref : io.airbyte.config.persistence.MockData#workspaceServiceAccounts() for sample data
description: Represents the secret and access id of generated HMAC key for the service account
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
1 change: 1 addition & 0 deletions airbyte-config/persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation 'commons-io:commons-io:2.7'
implementation 'com.google.cloud:google-cloud-secretmanager:2.0.5'

testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation project(':airbyte-test-utils')
integrationTestJavaImplementation project(':airbyte-config:persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
Expand Down Expand Up @@ -970,4 +971,15 @@ private Condition includeTombstones(final Field<Boolean> tombstoneField, final b
}
}

public WorkspaceServiceAccount getWorkspaceServiceAccountNoSecrets(final UUID workspaceId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.WORKSPACE_SERVICE_ACCOUNT, workspaceId.toString(), WorkspaceServiceAccount.class);
}

public void writeWorkspaceServiceAccountNoSecrets(final WorkspaceServiceAccount workspaceServiceAccount)
throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.WORKSPACE_SERVICE_ACCOUNT, workspaceServiceAccount.getWorkspaceId().toString(),
workspaceServiceAccount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static io.airbyte.db.instance.configs.jooq.Tables.OPERATION;
import static io.airbyte.db.instance.configs.jooq.Tables.STATE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE_SERVICE_ACCOUNT;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.select;

Expand Down Expand Up @@ -45,6 +46,7 @@
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
Expand Down Expand Up @@ -127,6 +129,8 @@ public <T> T getConfig(final AirbyteConfig configType, final String configId, fi
return (T) getActorCatalog(configId);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (T) getActorCatalogFetchEvent(configId);
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
return (T) getWorkspaceServiceAccount(configId);
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand All @@ -138,6 +142,12 @@ private StandardWorkspace getStandardWorkspace(final String configId) throws IOE
return result.get(0).getConfig();
}

private WorkspaceServiceAccount getWorkspaceServiceAccount(final String configId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<WorkspaceServiceAccount>> result = listWorkspaceServiceAccountWithMetadata(Optional.of(UUID.fromString(configId)));
validate(configId, result, ConfigSchema.WORKSPACE_SERVICE_ACCOUNT);
return result.get(0).getConfig();
}

private StandardSourceDefinition getStandardSourceDefinition(final String configId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<StandardSourceDefinition>> result =
listStandardSourceDefinitionWithMetadata(Optional.of(UUID.fromString(configId)));
Expand Down Expand Up @@ -272,6 +282,8 @@ public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig confi
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogFetchEventWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listWorkspaceServiceAccountWithMetadata(configIdOpt), configType);
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -304,6 +316,8 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf
listActorCatalogWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
listActorCatalogFetchEventWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
listWorkspaceServiceAccountWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -337,6 +351,33 @@ private List<ConfigWithMetadata<StandardWorkspace>> listStandardWorkspaceWithMet
return standardWorkspaces;
}

private List<ConfigWithMetadata<WorkspaceServiceAccount>> listWorkspaceServiceAccountWithMetadata() throws IOException {
return listWorkspaceServiceAccountWithMetadata(Optional.empty());
}

private List<ConfigWithMetadata<WorkspaceServiceAccount>> listWorkspaceServiceAccountWithMetadata(final Optional<UUID> configId)
throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(WORKSPACE_SERVICE_ACCOUNT);
if (configId.isPresent()) {
return query.where(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID.eq(configId.get())).fetch();
}
return query.fetch();
});

final List<ConfigWithMetadata<WorkspaceServiceAccount>> workspaceServiceAccounts = new ArrayList<>();
for (final Record record : result) {
final WorkspaceServiceAccount workspaceServiceAccount = DbConverter.buildWorkspaceServiceAccount(record);
workspaceServiceAccounts.add(new ConfigWithMetadata<>(
record.get(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID).toString(),
ConfigSchema.WORKSPACE_SERVICE_ACCOUNT.name(),
record.get(WORKSPACE_SERVICE_ACCOUNT.CREATED_AT).toInstant(),
record.get(WORKSPACE_SERVICE_ACCOUNT.UPDATED_AT).toInstant(),
workspaceServiceAccount));
}
return workspaceServiceAccounts;
}

private List<ConfigWithMetadata<StandardSourceDefinition>> listStandardSourceDefinitionWithMetadata() throws IOException {
return listStandardSourceDefinitionWithMetadata(Optional.empty());
}
Expand Down Expand Up @@ -697,6 +738,8 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
writeActorCatalog(Collections.singletonList((ActorCatalog) config));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
writeActorCatalogFetchEvent(Collections.singletonList((ActorCatalogFetchEvent) config));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
writeWorkspaceServiceAccount(Collections.singletonList((WorkspaceServiceAccount) config));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -758,6 +801,44 @@ private void writeStandardWorkspace(final List<StandardWorkspace> configs, final
});
}

private void writeWorkspaceServiceAccount(final List<WorkspaceServiceAccount> configs) throws IOException {
database.transaction(ctx -> {
writeWorkspaceServiceAccount(configs, ctx);
return null;
});
}

private void writeWorkspaceServiceAccount(final List<WorkspaceServiceAccount> configs, final DSLContext ctx) {
final OffsetDateTime timestamp = OffsetDateTime.now();
configs.forEach((workspaceServiceAccount) -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(WORKSPACE_SERVICE_ACCOUNT)
.where(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID.eq(workspaceServiceAccount.getWorkspaceId())));

if (isExistingConfig) {
ctx.update(WORKSPACE_SERVICE_ACCOUNT)
.set(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID, workspaceServiceAccount.getWorkspaceId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_ID, workspaceServiceAccount.getServiceAccountId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_EMAIL, workspaceServiceAccount.getServiceAccountEmail())
.set(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getJsonCredential())))
.set(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getHmacKey())))
.set(WORKSPACE_SERVICE_ACCOUNT.UPDATED_AT, timestamp)
.where(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID.eq(workspaceServiceAccount.getWorkspaceId()))
.execute();
} else {
ctx.insertInto(WORKSPACE_SERVICE_ACCOUNT)
.set(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID, workspaceServiceAccount.getWorkspaceId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_ID, workspaceServiceAccount.getServiceAccountId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_EMAIL, workspaceServiceAccount.getServiceAccountEmail())
.set(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getJsonCredential())))
.set(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getHmacKey())))
.set(WORKSPACE_SERVICE_ACCOUNT.CREATED_AT, timestamp)
.set(WORKSPACE_SERVICE_ACCOUNT.UPDATED_AT, timestamp)
.execute();
}
});
}

private void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs) throws IOException {
database.transaction(ctx -> {
ConfigWriter.writeStandardSourceDefinition(configs, ctx);
Expand Down Expand Up @@ -1190,6 +1271,8 @@ public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T
writeActorCatalog(configs.values().stream().map(c -> (ActorCatalog) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
writeActorCatalogFetchEvent(configs.values().stream().map(c -> (ActorCatalogFetchEvent) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
writeWorkspaceServiceAccount(configs.values().stream().map(c -> (WorkspaceServiceAccount) c).collect(Collectors.toList()));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -1221,6 +1304,8 @@ public void deleteConfig(final AirbyteConfig configType, final String configId)
deleteConfig(ACTOR_CATALOG, ACTOR_CATALOG.ID, UUID.fromString(configId));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
deleteConfig(ACTOR_CATALOG_FETCH_EVENT, ACTOR_CATALOG_FETCH_EVENT.ID, UUID.fromString(configId));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
deleteConfig(WORKSPACE_SERVICE_ACCOUNT, WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID, UUID.fromString(configId));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -1278,6 +1363,7 @@ public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final
ctx.truncate(STATE).restartIdentity().cascade().execute();
ctx.truncate(ACTOR_CATALOG).restartIdentity().cascade().execute();
ctx.truncate(ACTOR_CATALOG_FETCH_EVENT).restartIdentity().cascade().execute();
ctx.truncate(WORKSPACE_SERVICE_ACCOUNT).restartIdentity().cascade().execute();

if (configs.containsKey(ConfigSchema.STANDARD_WORKSPACE)) {
configs.get(ConfigSchema.STANDARD_WORKSPACE).map(c -> (StandardWorkspace) c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE_SERVICE_ACCOUNT;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
Expand All @@ -26,6 +27,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
Expand Down Expand Up @@ -147,4 +149,15 @@ public static ActorCatalog buildActorCatalog(final Record record) {
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
}

public static WorkspaceServiceAccount buildWorkspaceServiceAccount(final Record record) {
return new WorkspaceServiceAccount()
.withWorkspaceId(record.get(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID))
.withServiceAccountId(record.get(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_ID))
.withServiceAccountEmail(record.get(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_EMAIL))
.withJsonCredential(record.get(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL) == null ? null
: Jsons.deserialize(record.get(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL).data()))
.withHmacKey(record.get(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY) == null ? null
: Jsons.deserialize(record.get(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY).data()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -96,4 +97,18 @@ private void hydrateValuesIfKeyPresent(final String key, final Map<String, Strea
}
}

public WorkspaceServiceAccount getWorkspaceServiceAccountWithSecrets(final UUID workspaceId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final WorkspaceServiceAccount workspaceServiceAccount = configRepository.getWorkspaceServiceAccountNoSecrets(workspaceId);

final JsonNode jsonCredential =
workspaceServiceAccount.getJsonCredential() != null ? secretsHydrator.hydrateSecretCoordinate(workspaceServiceAccount.getJsonCredential())
: null;

final JsonNode hmacKey =
workspaceServiceAccount.getHmacKey() != null ? secretsHydrator.hydrateSecretCoordinate(workspaceServiceAccount.getHmacKey()) : null;

return Jsons.clone(workspaceServiceAccount).withJsonCredential(jsonCredential).withHmacKey(hmacKey);
}

}
Loading