Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set resource limits for connector definitions: api layer #10482

Merged
merged 7 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2124,6 +2124,8 @@ components:
format: uri
icon:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
SourceDefinitionUpdate:
type: object
description: Update the SourceDefinition. Currently, the only allowed attribute to update is the default docker image version.
Expand All @@ -2135,6 +2137,8 @@ components:
$ref: "#/components/schemas/SourceDefinitionId"
dockerImageTag:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
SourceDefinitionRead:
type: object
required:
Expand Down Expand Up @@ -2162,6 +2166,8 @@ components:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
SourceDefinitionReadList:
type: object
required:
Expand Down Expand Up @@ -2393,6 +2399,8 @@ components:
format: uri
icon:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
DestinationDefinitionUpdate:
type: object
required:
Expand All @@ -2403,6 +2411,8 @@ components:
$ref: "#/components/schemas/DestinationDefinitionId"
dockerImageTag:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
DestinationDefinitionRead:
type: object
required:
Expand Down Expand Up @@ -2431,6 +2441,8 @@ components:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
DestinationDefinitionReadList:
type: object
required:
Expand Down Expand Up @@ -3475,6 +3487,40 @@ components:
$ref: "#/components/schemas/ConnectionStateObject"
ConnectionStateObject:
type: object
ActorDefinitionResourceRequirements:
description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
type: object
additionalProperties: false
properties:
default:
"$ref": "#/definitions/ResourceRequirements"
jobSpecific:
type: array
items:
"$ref": "#/definitions/JobTypeResourceLimit"
JobTypeResourceLimit:
description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
type: object
additionalProperties: false
required:
- jobType
- resourceRequirements
properties:
jobType:
"$ref": "#/definitions/JobType"
resourceRequirements:
"$ref": "#/definitions/ResourceRequirements"
JobType:
description: enum that describes the different types of jobs that the platform runs.
type: string
enum:
- get_spec
- check_connection
- discover_schema
- sync
- reset_connection
- connection_updater
- replicate
ResourceRequirements:
description: optional resource requirements to run workers (blank for unbounded allocations)
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.AirbyteConfigValidator;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -57,15 +58,19 @@ private YamlSeedConfigPersistence(final Class<?> seedResourceClass) throws IOExc
final Map<String, JsonNode> fullSourceDefinitionConfigs = sourceDefinitionConfigs.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> {
final JsonNode withTombstone = addMissingTombstoneField(e.getValue());
return mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs);
final JsonNode output = mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs);
AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_SOURCE_DEFINITION, output);
return output;
}));

final Map<String, JsonNode> destinationDefinitionConfigs = getConfigs(seedResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION);
final Map<String, JsonNode> destinationSpecConfigs = getConfigs(seedResourceClass, SeedType.DESTINATION_SPEC);
final Map<String, JsonNode> fullDestinationDefinitionConfigs = destinationDefinitionConfigs.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> {
final JsonNode withTombstone = addMissingTombstoneField(e.getValue());
return mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs);
final JsonNode output = mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs);
AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_DESTINATION_DEFINITION, output);
return output;
}));

this.allSeedConfigs = ImmutableMap.<SeedType, Map<String, JsonNode>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,22 @@
dockerImageTag: 0.4.15
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
# this is our first example of setting these requirements. they are guesses, not data driven.
# setting default cpu higher, because we have found that check and discover can be cpu constrained for dbs.
default:
cpu_limit: "1.0"
cpu_request: "1.0"
Comment on lines +195 to +196
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the default cpu requests higher than the sync job cpu requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah. good question. i'll add a comment explaining.

memory_limit: "300Mi"
memory_request: "300Mi"
jobSpecific:
# sync jobs are generally IO and memory bound and not cpu.
- jobType: sync
resourceRequirements:
cpu_limit: "0.5"
cpu_request: "0.5"
memory_limit: "600Mi"
memory_request: "600Mi"
- name: MariaDB ColumnStore
destinationDefinitionId: 294a4790-429b-40ae-9516-49826b9702e1
dockerRepository: airbyte/destination-mariadb-columnstore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

public class AirbyteConfigValidator extends AbstractSchemaValidator<ConfigSchema> {

public static AirbyteConfigValidator AIRBYTE_CONFIG_VALIDATOR = new AirbyteConfigValidator();

@Override
public Path getSchemaPath(final ConfigSchema configType) {
return configType.getConfigSchemaFile().toPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
title: ActorDefinitionResourceRequirements
description: actor definition specific resource requirements
type: object
additionalProperties: true
# set to false because we need the validations on seeds to be strict. otherwise, we will just add whatever is in the seed file into the db.
additionalProperties: false
properties:
default:
description: if set, these are the requirements that should be set for ALL jobs run for this actor definition.
Expand All @@ -17,8 +18,10 @@ definitions:
JobTypeResourceLimit:
description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
type: object
# set to false because we need the validations on seeds to be strict. otherwise, we will just add whatever is in the seed file into the db.
additionalProperties: false
required:
- jobtype
- jobType
- resourceRequirements
properties:
jobType:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: ResourceRequirements
description: generic configuration for pod source requirements
type: object
additionalProperties: true
additionalProperties: false
properties:
# todo (cgardens) - should be camel case for consistency.
cpu_request:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,77 @@

package io.airbyte.server.converters;

import io.airbyte.api.model.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.ConnectionRead;
import io.airbyte.api.model.ConnectionSchedule;
import io.airbyte.api.model.ConnectionStatus;
import io.airbyte.api.model.ConnectionUpdate;
import io.airbyte.api.model.JobType;
import io.airbyte.api.model.JobTypeResourceLimit;
import io.airbyte.api.model.ResourceRequirements;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.workers.helper.CatalogConverter;
import java.util.stream.Collectors;

public class ApiPojoConverters {

public static io.airbyte.config.ResourceRequirements resourceRequirementsToInternal(final ResourceRequirements resourceRequirements) {
public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqsToInternal(final ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
}

return new io.airbyte.config.ActorDefinitionResourceRequirements()
.withDefault(actorDefResourceReqs.getDefault() == null ? null : resourceRequirementsToInternal(actorDefResourceReqs.getDefault()))
.withJobSpecific(actorDefResourceReqs.getJobSpecific() == null ? null
: actorDefResourceReqs.getJobSpecific()
.stream()
.map(jobSpecific -> new io.airbyte.config.JobTypeResourceLimit()
.withJobType(toInternalJobType(jobSpecific.getJobType()))
.withResourceRequirements(resourceRequirementsToInternal(jobSpecific.getResourceRequirements())))
.collect(Collectors.toList()));
}

public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
}

return new ActorDefinitionResourceRequirements()
._default(actorDefResourceReqs.getDefault() == null ? null : resourceRequirementsToApi(actorDefResourceReqs.getDefault()))
.jobSpecific(actorDefResourceReqs.getJobSpecific() == null ? null
: actorDefResourceReqs.getJobSpecific()
.stream()
.map(jobSpecific -> new JobTypeResourceLimit()
.jobType(toApiJobType(jobSpecific.getJobType()))
.resourceRequirements(resourceRequirementsToApi(jobSpecific.getResourceRequirements())))
.collect(Collectors.toList()));
}

public static io.airbyte.config.ResourceRequirements resourceRequirementsToInternal(final ResourceRequirements resourceReqs) {
if (resourceReqs == null) {
return null;
}

return new io.airbyte.config.ResourceRequirements()
.withCpuRequest(resourceRequirements.getCpuRequest())
.withCpuLimit(resourceRequirements.getCpuLimit())
.withMemoryRequest(resourceRequirements.getMemoryRequest())
.withMemoryLimit(resourceRequirements.getMemoryLimit());
.withCpuRequest(resourceReqs.getCpuRequest())
.withCpuLimit(resourceReqs.getCpuLimit())
.withMemoryRequest(resourceReqs.getMemoryRequest())
.withMemoryLimit(resourceReqs.getMemoryLimit());
}

public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.config.ResourceRequirements resourceRequirements) {
public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.config.ResourceRequirements resourceReqs) {
if (resourceReqs == null) {
return null;
}

return new ResourceRequirements()
.cpuRequest(resourceRequirements.getCpuRequest())
.cpuLimit(resourceRequirements.getCpuLimit())
.memoryRequest(resourceRequirements.getMemoryRequest())
.memoryLimit(resourceRequirements.getMemoryLimit());
.cpuRequest(resourceReqs.getCpuRequest())
.cpuLimit(resourceReqs.getCpuLimit())
.memoryRequest(resourceReqs.getMemoryRequest())
.memoryLimit(resourceReqs.getMemoryLimit());
}

public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) {
Expand Down Expand Up @@ -90,6 +134,14 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar
return connectionRead;
}

public static JobType toApiJobType(final io.airbyte.config.JobTypeResourceLimit.JobType jobType) {
return Enums.convertTo(jobType, JobType.class);
}

public static io.airbyte.config.JobTypeResourceLimit.JobType toInternalJobType(final JobType jobType) {
return Enums.convertTo(jobType, io.airbyte.config.JobTypeResourceLimit.JobType.class);
}

public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final Schedule.TimeUnit apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, ConnectionSchedule.TimeUnitEnum.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import io.airbyte.api.model.ReleaseStage;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.server.converters.ApiPojoConverters;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.server.services.AirbyteGithubStore;
Expand Down Expand Up @@ -77,7 +79,8 @@ static DestinationDefinitionRead buildDestinationDefinitionRead(final StandardDe
.documentationUrl(new URI(standardDestinationDefinition.getDocumentationUrl()))
.icon(loadIcon(standardDestinationDefinition.getIcon()))
.releaseStage(getReleaseStage(standardDestinationDefinition))
.releaseDate(getReleaseDate(standardDestinationDefinition));
.releaseDate(getReleaseDate(standardDestinationDefinition))
.resourceRequirements(ApiPojoConverters.actorDefResourceReqsToApi(standardDestinationDefinition.getResourceRequirements()));
} catch (final URISyntaxException | NullPointerException e) {
throw new InternalServerKnownException("Unable to process retrieved latest destination definitions list", e);
}
Expand Down Expand Up @@ -127,23 +130,24 @@ public DestinationDefinitionRead getDestinationDefinition(final DestinationDefin
configRepository.getStandardDestinationDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId()));
}

public DestinationDefinitionRead createCustomDestinationDefinition(final DestinationDefinitionCreate destinationDefinitionCreate)
public DestinationDefinitionRead createCustomDestinationDefinition(final DestinationDefinitionCreate destinationDefCreate)
throws JsonValidationException, IOException {
final ConnectorSpecification spec = getSpecForImage(
destinationDefinitionCreate.getDockerRepository(),
destinationDefinitionCreate.getDockerImageTag());
destinationDefCreate.getDockerRepository(),
destinationDefCreate.getDockerImageTag());

final UUID id = uuidSupplier.get();
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDestinationDefinitionId(id)
.withDockerRepository(destinationDefinitionCreate.getDockerRepository())
.withDockerImageTag(destinationDefinitionCreate.getDockerImageTag())
.withDocumentationUrl(destinationDefinitionCreate.getDocumentationUrl().toString())
.withName(destinationDefinitionCreate.getName())
.withIcon(destinationDefinitionCreate.getIcon())
.withDockerRepository(destinationDefCreate.getDockerRepository())
.withDockerImageTag(destinationDefCreate.getDockerImageTag())
.withDocumentationUrl(destinationDefCreate.getDocumentationUrl().toString())
.withName(destinationDefCreate.getName())
.withIcon(destinationDefCreate.getIcon())
.withSpec(spec)
.withTombstone(false)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM);
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
.withResourceRequirements(ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefCreate.getResourceRequirements()));

configRepository.writeStandardDestinationDefinition(destinationDefinition);

Expand All @@ -162,6 +166,9 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
final ConnectorSpecification spec = specNeedsUpdate
? getSpecForImage(currentDestination.getDockerRepository(), destinationDefinitionUpdate.getDockerImageTag())
: currentDestination.getSpec();
final ActorDefinitionResourceRequirements updatedResourceReqs = destinationDefinitionUpdate.getResourceRequirements() != null
? ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefinitionUpdate.getResourceRequirements())
: currentDestination.getResourceRequirements();

final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
Expand All @@ -173,7 +180,8 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
.withSpec(spec)
.withTombstone(currentDestination.getTombstone())
.withReleaseStage(currentDestination.getReleaseStage())
.withReleaseDate(currentDestination.getReleaseDate());
.withReleaseDate(currentDestination.getReleaseDate())
.withResourceRequirements(updatedResourceReqs);

configRepository.writeStandardDestinationDefinition(newDestination);
return buildDestinationDefinitionRead(newDestination);
Expand Down
Loading