From f8aeea566cac298523dde33c2f93e20ec2f94877 Mon Sep 17 00:00:00 2001 From: cgardens Date: Sat, 19 Feb 2022 17:00:09 -0800 Subject: [PATCH 1/7] expose in api --- airbyte-api/src/main/openapi/config.yaml | 40 +++++++++++ .../server/converters/ApiPojoConverters.java | 72 ++++++++++++++++--- .../DestinationDefinitionsHandler.java | 30 +++++--- .../server/handlers/SchedulerHandler.java | 4 +- .../handlers/SourceDefinitionsHandler.java | 21 ++++-- .../DestinationDefinitionsHandlerTest.java | 47 ++++++++---- .../server/handlers/SchedulerHandlerTest.java | 22 +++++- .../SourceDefinitionsHandlerTest.java | 31 ++++++-- 8 files changed, 217 insertions(+), 50 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index dca784363257e..d6258fee2f2cf 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2124,6 +2124,8 @@ components: format: uri icon: type: string + resourceRequirements: + $ref: "#/components/schemas/ActorDefinitionResourceRequirements" SourceDefinitionUpdate: type: object description: Update the SourceDefinition. Currently, the only allowed attribute to update is the default docker image version. @@ -2135,6 +2137,8 @@ components: $ref: "#/components/schemas/SourceDefinitionId" dockerImageTag: type: string + resourceRequirements: + $ref: "#/components/schemas/ActorDefinitionResourceRequirements" SourceDefinitionRead: type: object required: @@ -2162,6 +2166,8 @@ components: description: The date when this connector was first released, in yyyy-mm-dd format. type: string format: date + resourceRequirements: + $ref: "#/components/schemas/ActorDefinitionResourceRequirements" SourceDefinitionReadList: type: object required: @@ -2393,6 +2399,8 @@ components: format: uri icon: type: string + resourceRequirements: + $ref: "#/components/schemas/ActorDefinitionResourceRequirements" DestinationDefinitionUpdate: type: object required: @@ -2403,6 +2411,8 @@ components: $ref: "#/components/schemas/DestinationDefinitionId" dockerImageTag: type: string + resourceRequirements: + $ref: "#/components/schemas/ActorDefinitionResourceRequirements" DestinationDefinitionRead: type: object required: @@ -2431,6 +2441,8 @@ components: description: The date when this connector was first released, in yyyy-mm-dd format. type: string format: date + resourceRequirements: + $ref: "#/components/schemas/ActorDefinitionResourceRequirements" DestinationDefinitionReadList: type: object required: @@ -3475,6 +3487,34 @@ components: $ref: "#/components/schemas/ConnectionStateObject" ConnectionStateObject: type: object + ActorDefinitionResourceRequirements: + description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level. + type: object + additionalProperties: true + properties: + default: + "$ref": "#/definitions/ResourceRequirements" + jobSpecific: + type: array + items: + "$ref": "#/definitions/JobTypeResourceLimit" + JobTypeResourceLimit: + description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set. + type: object + properties: + jobType: + "$ref": "#/definitions/JobType" + resourceRequirements: + "$ref": "#/definitions/ResourceRequirements" + JobType: + description: enum that describes the different types of jobs that the platform runs. + type: string + # todo (cgardens) - sync naming with java + enum: + - sync + - check + - discover + - spec ResourceRequirements: description: optional resource requirements to run workers (blank for unbounded allocations) type: object diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java index d02c3cab743f1..3807e0ab30aac 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java @@ -4,33 +4,77 @@ package io.airbyte.server.converters; +import io.airbyte.api.model.ActorDefinitionResourceRequirements; import io.airbyte.api.model.ConnectionRead; import io.airbyte.api.model.ConnectionSchedule; import io.airbyte.api.model.ConnectionStatus; import io.airbyte.api.model.ConnectionUpdate; +import io.airbyte.api.model.JobType; +import io.airbyte.api.model.JobTypeResourceLimit; import io.airbyte.api.model.ResourceRequirements; import io.airbyte.commons.enums.Enums; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; import io.airbyte.config.StandardSync; import io.airbyte.workers.helper.CatalogConverter; +import java.util.stream.Collectors; public class ApiPojoConverters { - public static io.airbyte.config.ResourceRequirements resourceRequirementsToInternal(final ResourceRequirements resourceRequirements) { + public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqsToInternal(final ActorDefinitionResourceRequirements actorDefResourceReqs) { + if (actorDefResourceReqs == null) { + return null; + } + + return new io.airbyte.config.ActorDefinitionResourceRequirements() + .withDefault(actorDefResourceReqs.getDefault() == null ? null : resourceRequirementsToInternal(actorDefResourceReqs.getDefault())) + .withJobSpecific(actorDefResourceReqs.getJobSpecific() == null ? null + : actorDefResourceReqs.getJobSpecific() + .stream() + .map(jobSpecific -> new io.airbyte.config.JobTypeResourceLimit() + .withJobType(toInternalJobType(jobSpecific.getJobType())) + .withResourceRequirements(resourceRequirementsToInternal(jobSpecific.getResourceRequirements()))) + .collect(Collectors.toList())); + } + + public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) { + if (actorDefResourceReqs == null) { + return null; + } + + return new ActorDefinitionResourceRequirements() + ._default(actorDefResourceReqs.getDefault() == null ? null : resourceRequirementsToApi(actorDefResourceReqs.getDefault())) + .jobSpecific(actorDefResourceReqs.getJobSpecific() == null ? null + : actorDefResourceReqs.getJobSpecific() + .stream() + .map(jobSpecific -> new JobTypeResourceLimit() + .jobType(toApiJobType(jobSpecific.getJobType())) + .resourceRequirements(resourceRequirementsToApi(jobSpecific.getResourceRequirements()))) + .collect(Collectors.toList())); + } + + public static io.airbyte.config.ResourceRequirements resourceRequirementsToInternal(final ResourceRequirements resourceReqs) { + if (resourceReqs == null) { + return null; + } + return new io.airbyte.config.ResourceRequirements() - .withCpuRequest(resourceRequirements.getCpuRequest()) - .withCpuLimit(resourceRequirements.getCpuLimit()) - .withMemoryRequest(resourceRequirements.getMemoryRequest()) - .withMemoryLimit(resourceRequirements.getMemoryLimit()); + .withCpuRequest(resourceReqs.getCpuRequest()) + .withCpuLimit(resourceReqs.getCpuLimit()) + .withMemoryRequest(resourceReqs.getMemoryRequest()) + .withMemoryLimit(resourceReqs.getMemoryLimit()); } - public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.config.ResourceRequirements resourceRequirements) { + public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.config.ResourceRequirements resourceReqs) { + if (resourceReqs == null) { + return null; + } + return new ResourceRequirements() - .cpuRequest(resourceRequirements.getCpuRequest()) - .cpuLimit(resourceRequirements.getCpuLimit()) - .memoryRequest(resourceRequirements.getMemoryRequest()) - .memoryLimit(resourceRequirements.getMemoryLimit()); + .cpuRequest(resourceReqs.getCpuRequest()) + .cpuLimit(resourceReqs.getCpuLimit()) + .memoryRequest(resourceReqs.getMemoryRequest()) + .memoryLimit(resourceReqs.getMemoryLimit()); } public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) { @@ -90,6 +134,14 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar return connectionRead; } + public static JobType toApiJobType(final io.airbyte.config.JobTypeResourceLimit.JobType jobType) { + return Enums.convertTo(jobType, JobType.class); + } + + public static io.airbyte.config.JobTypeResourceLimit.JobType toInternalJobType(final JobType jobType) { + return Enums.convertTo(jobType, io.airbyte.config.JobTypeResourceLimit.JobType.class); + } + public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final Schedule.TimeUnit apiTimeUnit) { return Enums.convertTo(apiTimeUnit, ConnectionSchedule.TimeUnitEnum.class); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java index 1065b31aed015..cb848e5cc4321 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java @@ -16,12 +16,14 @@ import io.airbyte.api.model.ReleaseStage; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.SynchronousResponse; import io.airbyte.scheduler.client.SynchronousSchedulerClient; +import io.airbyte.server.converters.ApiPojoConverters; import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.InternalServerKnownException; import io.airbyte.server.services.AirbyteGithubStore; @@ -77,7 +79,8 @@ static DestinationDefinitionRead buildDestinationDefinitionRead(final StandardDe .documentationUrl(new URI(standardDestinationDefinition.getDocumentationUrl())) .icon(loadIcon(standardDestinationDefinition.getIcon())) .releaseStage(getReleaseStage(standardDestinationDefinition)) - .releaseDate(getReleaseDate(standardDestinationDefinition)); + .releaseDate(getReleaseDate(standardDestinationDefinition)) + .resourceRequirements(ApiPojoConverters.actorDefResourceReqsToApi(standardDestinationDefinition.getResourceRequirements())); } catch (final URISyntaxException | NullPointerException e) { throw new InternalServerKnownException("Unable to process retrieved latest destination definitions list", e); } @@ -127,23 +130,24 @@ public DestinationDefinitionRead getDestinationDefinition(final DestinationDefin configRepository.getStandardDestinationDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())); } - public DestinationDefinitionRead createCustomDestinationDefinition(final DestinationDefinitionCreate destinationDefinitionCreate) + public DestinationDefinitionRead createCustomDestinationDefinition(final DestinationDefinitionCreate destinationDefCreate) throws JsonValidationException, IOException { final ConnectorSpecification spec = getSpecForImage( - destinationDefinitionCreate.getDockerRepository(), - destinationDefinitionCreate.getDockerImageTag()); + destinationDefCreate.getDockerRepository(), + destinationDefCreate.getDockerImageTag()); final UUID id = uuidSupplier.get(); final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() .withDestinationDefinitionId(id) - .withDockerRepository(destinationDefinitionCreate.getDockerRepository()) - .withDockerImageTag(destinationDefinitionCreate.getDockerImageTag()) - .withDocumentationUrl(destinationDefinitionCreate.getDocumentationUrl().toString()) - .withName(destinationDefinitionCreate.getName()) - .withIcon(destinationDefinitionCreate.getIcon()) + .withDockerRepository(destinationDefCreate.getDockerRepository()) + .withDockerImageTag(destinationDefCreate.getDockerImageTag()) + .withDocumentationUrl(destinationDefCreate.getDocumentationUrl().toString()) + .withName(destinationDefCreate.getName()) + .withIcon(destinationDefCreate.getIcon()) .withSpec(spec) .withTombstone(false) - .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM); + .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM) + .withResourceRequirements(ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefCreate.getResourceRequirements())); configRepository.writeStandardDestinationDefinition(destinationDefinition); @@ -162,6 +166,9 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe final ConnectorSpecification spec = specNeedsUpdate ? getSpecForImage(currentDestination.getDockerRepository(), destinationDefinitionUpdate.getDockerImageTag()) : currentDestination.getSpec(); + final ActorDefinitionResourceRequirements updatedResourceReqs = destinationDefinitionUpdate.getResourceRequirements() != null + ? ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefinitionUpdate.getResourceRequirements()) + : currentDestination.getResourceRequirements(); final StandardDestinationDefinition newDestination = new StandardDestinationDefinition() .withDestinationDefinitionId(currentDestination.getDestinationDefinitionId()) @@ -173,7 +180,8 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe .withSpec(spec) .withTombstone(currentDestination.getTombstone()) .withReleaseStage(currentDestination.getReleaseStage()) - .withReleaseDate(currentDestination.getReleaseDate()); + .withReleaseDate(currentDestination.getReleaseDate()) + .withResourceRequirements(updatedResourceReqs); configRepository.writeStandardDestinationDefinition(newDestination); return buildDestinationDefinitionRead(newDestination); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index a56a2107f206f..54e16a91c277f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -342,7 +342,9 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ standardSync, sourceImageName, destinationImageName, - standardSyncOperations); + standardSyncOperations, + sourceDef.getResourceRequirements(), + destinationDef.getResourceRequirements()); return jobConverter.getJobInfoRead(job); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java index 80907169babcc..3fbd66b964515 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java @@ -16,12 +16,14 @@ import io.airbyte.api.model.SourceRead; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.SynchronousResponse; import io.airbyte.scheduler.client.SynchronousSchedulerClient; +import io.airbyte.server.converters.ApiPojoConverters; import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.InternalServerKnownException; import io.airbyte.server.services.AirbyteGithubStore; @@ -43,15 +45,13 @@ public class SourceDefinitionsHandler { private final SynchronousSchedulerClient schedulerSynchronousClient; private final SourceHandler sourceHandler; - public SourceDefinitionsHandler( - final ConfigRepository configRepository, + public SourceDefinitionsHandler(final ConfigRepository configRepository, final SynchronousSchedulerClient schedulerSynchronousClient, final SourceHandler sourceHandler) { this(configRepository, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production(), sourceHandler); } - public SourceDefinitionsHandler( - final ConfigRepository configRepository, + public SourceDefinitionsHandler(final ConfigRepository configRepository, final Supplier uuidSupplier, final SynchronousSchedulerClient schedulerSynchronousClient, final AirbyteGithubStore githubStore, @@ -74,7 +74,9 @@ static SourceDefinitionRead buildSourceDefinitionRead(final StandardSourceDefini .documentationUrl(new URI(standardSourceDefinition.getDocumentationUrl())) .icon(loadIcon(standardSourceDefinition.getIcon())) .releaseStage(getReleaseStage(standardSourceDefinition)) - .releaseDate(getReleaseDate(standardSourceDefinition)); + .releaseDate(getReleaseDate(standardSourceDefinition)) + .resourceRequirements(ApiPojoConverters.actorDefResourceReqsToApi(standardSourceDefinition.getResourceRequirements())); + } catch (final URISyntaxException | NullPointerException e) { throw new InternalServerKnownException("Unable to process retrieved latest source definitions list", e); } @@ -137,7 +139,8 @@ public SourceDefinitionRead createCustomSourceDefinition(final SourceDefinitionC .withIcon(sourceDefinitionCreate.getIcon()) .withSpec(spec) .withTombstone(false) - .withReleaseStage(StandardSourceDefinition.ReleaseStage.CUSTOM); + .withReleaseStage(StandardSourceDefinition.ReleaseStage.CUSTOM) + .withResourceRequirements(ApiPojoConverters.actorDefResourceReqsToInternal(sourceDefinitionCreate.getResourceRequirements())); configRepository.writeStandardSourceDefinition(sourceDefinition); @@ -156,6 +159,9 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate final ConnectorSpecification spec = specNeedsUpdate ? getSpecForImage(currentSourceDefinition.getDockerRepository(), sourceDefinitionUpdate.getDockerImageTag()) : currentSourceDefinition.getSpec(); + final ActorDefinitionResourceRequirements updatedResourceReqs = sourceDefinitionUpdate.getResourceRequirements() != null + ? ApiPojoConverters.actorDefResourceReqsToInternal(sourceDefinitionUpdate.getResourceRequirements()) + : currentSourceDefinition.getResourceRequirements(); final StandardSourceDefinition newSource = new StandardSourceDefinition() .withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId()) @@ -167,7 +173,8 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate .withSpec(spec) .withTombstone(currentSourceDefinition.getTombstone()) .withReleaseStage(currentSourceDefinition.getReleaseStage()) - .withReleaseDate(currentSourceDefinition.getReleaseDate()); + .withReleaseDate(currentSourceDefinition.getReleaseDate()) + .withResourceRequirements(updatedResourceReqs); configRepository.writeStandardSourceDefinition(newSource); return buildSourceDefinitionRead(newSource); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java index 402fc952ba570..68aa346f6a214 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java @@ -24,7 +24,9 @@ import io.airbyte.api.model.ReleaseStage; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -68,13 +70,17 @@ void setUp() { githubStore = mock(AirbyteGithubStore.class); destinationHandler = mock(DestinationHandler.class); - destinationDefinitionsHandler = - new DestinationDefinitionsHandler(configRepository, uuidSupplier, schedulerSynchronousClient, githubStore, destinationHandler); + destinationDefinitionsHandler = new DestinationDefinitionsHandler( + configRepository, + uuidSupplier, + schedulerSynchronousClient, + githubStore, + destinationHandler); } private StandardDestinationDefinition generateDestinationDefinition() { - final ConnectorSpecification spec = new ConnectorSpecification().withConnectionSpecification( - Jsons.jsonNode(ImmutableMap.of("foo", "bar"))); + final ConnectorSpecification spec = new ConnectorSpecification() + .withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar"))); return new StandardDestinationDefinition() .withDestinationDefinitionId(UUID.randomUUID()) @@ -86,7 +92,8 @@ private StandardDestinationDefinition generateDestinationDefinition() { .withSpec(spec) .withTombstone(false) .withReleaseStage(StandardDestinationDefinition.ReleaseStage.ALPHA) - .withReleaseDate(TODAY_DATE_STRING); + .withReleaseDate(TODAY_DATE_STRING) + .withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2"))); } @Test @@ -104,7 +111,10 @@ void testListDestinations() throws JsonValidationException, IOException, URISynt .documentationUrl(new URI(destinationDefinition.getDocumentationUrl())) .icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon())) .releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value())) - .releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate())); + .releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate())) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(destinationDefinition.getResourceRequirements().getDefault().getCpuRequest()))); final DestinationDefinitionRead expectedDestinationDefinitionRead2 = new DestinationDefinitionRead() .destinationDefinitionId(destination2.getDestinationDefinitionId()) @@ -114,7 +124,10 @@ void testListDestinations() throws JsonValidationException, IOException, URISynt .documentationUrl(new URI(destination2.getDocumentationUrl())) .icon(DestinationDefinitionsHandler.loadIcon(destination2.getIcon())) .releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value())) - .releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate())); + .releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate())) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(destination2.getResourceRequirements().getDefault().getCpuRequest()))); final DestinationDefinitionReadList actualDestinationDefinitionReadList = destinationDefinitionsHandler.listDestinationDefinitions(); @@ -137,7 +150,10 @@ void testGetDestination() throws JsonValidationException, ConfigNotFoundExceptio .documentationUrl(new URI(destinationDefinition.getDocumentationUrl())) .icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon())) .releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value())) - .releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate())); + .releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate())) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(destinationDefinition.getResourceRequirements().getDefault().getCpuRequest()))); final DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody() .destinationDefinitionId(destinationDefinition.getDestinationDefinitionId()); @@ -164,7 +180,10 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J .dockerRepository(destination.getDockerRepository()) .dockerImageTag(destination.getDockerImageTag()) .documentationUrl(new URI(destination.getDocumentationUrl())) - .icon(destination.getIcon()); + .icon(destination.getIcon()) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(destination.getResourceRequirements().getDefault().getCpuRequest()))); final DestinationDefinitionRead expectedRead = new DestinationDefinitionRead() .name(destination.getName()) @@ -173,14 +192,18 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J .documentationUrl(new URI(destination.getDocumentationUrl())) .destinationDefinitionId(destination.getDestinationDefinitionId()) .icon(DestinationDefinitionsHandler.loadIcon(destination.getIcon())) - .releaseStage(ReleaseStage.CUSTOM); + .releaseStage(ReleaseStage.CUSTOM) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(destination.getResourceRequirements().getDefault().getCpuRequest()))); final DestinationDefinitionRead actualRead = destinationDefinitionsHandler.createCustomDestinationDefinition(create); assertEquals(expectedRead, actualRead); verify(schedulerSynchronousClient).createGetSpecJob(imageName); - verify(configRepository).writeStandardDestinationDefinition(destination.withReleaseDate(null).withReleaseStage( - StandardDestinationDefinition.ReleaseStage.CUSTOM)); + verify(configRepository).writeStandardDestinationDefinition(destination + .withReleaseDate(null) + .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 70f4d3e2b0b3a..11481fc34829e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -501,8 +501,16 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); when(configRepository.getDestinationConnection(destination.getDestinationId())).thenReturn(destination); when(configRepository.getStandardSyncOperation(operationId)).thenReturn(getOperation(operationId)); - when(schedulerJobClient.createOrGetActiveSyncJob(source, destination, standardSync, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE, operations)) - .thenReturn(completedJob); + when(schedulerJobClient.createOrGetActiveSyncJob( + source, + destination, + standardSync, + SOURCE_DOCKER_IMAGE, + DESTINATION_DOCKER_IMAGE, + operations, + null, + null)) + .thenReturn(completedJob); when(completedJob.getScope()).thenReturn("cat:12"); final JobConfig jobConfig = mock(JobConfig.class); when(completedJob.getConfig()).thenReturn(jobConfig); @@ -514,7 +522,15 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot verify(configRepository).getStandardSync(standardSync.getConnectionId()); verify(configRepository).getSourceConnection(standardSync.getSourceId()); verify(configRepository).getDestinationConnection(standardSync.getDestinationId()); - verify(schedulerJobClient).createOrGetActiveSyncJob(source, destination, standardSync, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE, operations); + verify(schedulerJobClient).createOrGetActiveSyncJob( + source, + destination, + standardSync, + SOURCE_DOCKER_IMAGE, + DESTINATION_DOCKER_IMAGE, + operations, + null, + null); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java index 7b6d8e644baae..245ca1067656e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java @@ -25,7 +25,9 @@ import io.airbyte.api.model.SourceReadList; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -88,7 +90,9 @@ private StandardSourceDefinition generateSourceDefinition() { .withSpec(spec) .withTombstone(false) .withReleaseStage(StandardSourceDefinition.ReleaseStage.ALPHA) - .withReleaseDate(TODAY_DATE_STRING); + .withReleaseDate(TODAY_DATE_STRING) + .withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2"))); + } @Test @@ -106,7 +110,10 @@ void testListSourceDefinitions() throws JsonValidationException, IOException, UR .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) .icon(SourceDefinitionsHandler.loadIcon(sourceDefinition.getIcon())) .releaseStage(ReleaseStage.fromValue(sourceDefinition.getReleaseStage().value())) - .releaseDate(LocalDate.parse(sourceDefinition.getReleaseDate())); + .releaseDate(LocalDate.parse(sourceDefinition.getReleaseDate())) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(sourceDefinition.getResourceRequirements().getDefault().getCpuRequest()))); final SourceDefinitionRead expectedSourceDefinitionRead2 = new SourceDefinitionRead() .sourceDefinitionId(sourceDefinition2.getSourceDefinitionId()) @@ -116,7 +123,10 @@ void testListSourceDefinitions() throws JsonValidationException, IOException, UR .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) .icon(SourceDefinitionsHandler.loadIcon(sourceDefinition.getIcon())) .releaseStage(ReleaseStage.fromValue(sourceDefinition.getReleaseStage().value())) - .releaseDate(LocalDate.parse(sourceDefinition.getReleaseDate())); + .releaseDate(LocalDate.parse(sourceDefinition.getReleaseDate())) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(sourceDefinition2.getResourceRequirements().getDefault().getCpuRequest()))); final SourceDefinitionReadList actualSourceDefinitionReadList = sourceDefinitionsHandler.listSourceDefinitions(); @@ -139,7 +149,10 @@ void testGetSourceDefinition() throws JsonValidationException, ConfigNotFoundExc .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) .icon(SourceDefinitionsHandler.loadIcon(sourceDefinition.getIcon())) .releaseStage(ReleaseStage.fromValue(sourceDefinition.getReleaseStage().value())) - .releaseDate(LocalDate.parse(sourceDefinition.getReleaseDate())); + .releaseDate(LocalDate.parse(sourceDefinition.getReleaseDate())) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(sourceDefinition.getResourceRequirements().getDefault().getCpuRequest()))); final SourceDefinitionIdRequestBody sourceDefinitionIdRequestBody = new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinition.getSourceDefinitionId()); @@ -165,7 +178,10 @@ void testCreateSourceDefinition() throws URISyntaxException, IOException, JsonVa .dockerRepository(sourceDefinition.getDockerRepository()) .dockerImageTag(sourceDefinition.getDockerImageTag()) .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) - .icon(sourceDefinition.getIcon()); + .icon(sourceDefinition.getIcon()) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(sourceDefinition.getResourceRequirements().getDefault().getCpuRequest()))); final SourceDefinitionRead expectedRead = new SourceDefinitionRead() .name(sourceDefinition.getName()) @@ -174,7 +190,10 @@ void testCreateSourceDefinition() throws URISyntaxException, IOException, JsonVa .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) .sourceDefinitionId(sourceDefinition.getSourceDefinitionId()) .icon(SourceDefinitionsHandler.loadIcon(sourceDefinition.getIcon())) - .releaseStage(ReleaseStage.CUSTOM); + .releaseStage(ReleaseStage.CUSTOM) + .resourceRequirements(new io.airbyte.api.model.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.ResourceRequirements() + .cpuRequest(sourceDefinition.getResourceRequirements().getDefault().getCpuRequest()))); final SourceDefinitionRead actualRead = sourceDefinitionsHandler.createCustomSourceDefinition(create); From b54bfc094784b4c741baa3afbb50f399021fdaf9 Mon Sep 17 00:00:00 2001 From: cgardens Date: Sat, 19 Feb 2022 18:10:32 -0800 Subject: [PATCH 2/7] fix compile --- .../java/io/airbyte/server/handlers/SchedulerHandler.java | 4 +--- .../io/airbyte/server/handlers/SchedulerHandlerTest.java | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 54e16a91c277f..a56a2107f206f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -342,9 +342,7 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ standardSync, sourceImageName, destinationImageName, - standardSyncOperations, - sourceDef.getResourceRequirements(), - destinationDef.getResourceRequirements()); + standardSyncOperations); return jobConverter.getJobInfoRead(job); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 11481fc34829e..a5e74e019e149 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -507,9 +507,7 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot standardSync, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE, - operations, - null, - null)) + operations)) .thenReturn(completedJob); when(completedJob.getScope()).thenReturn("cat:12"); final JobConfig jobConfig = mock(JobConfig.class); @@ -528,9 +526,7 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot standardSync, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE, - operations, - null, - null); + operations); } @Test From be7e0b0cd660d3b68e242d71feda1c5906fb3989 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 22 Feb 2022 18:38:56 -0800 Subject: [PATCH 3/7] clean --- airbyte-api/src/main/openapi/config.yaml | 10 +- .../api/generated-api-html/index.html | 399 ++++++++++++++++++ 2 files changed, 405 insertions(+), 4 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index d6258fee2f2cf..1a31229c73f54 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3509,12 +3509,14 @@ components: JobType: description: enum that describes the different types of jobs that the platform runs. type: string - # todo (cgardens) - sync naming with java enum: + - get_spec + - check_connection + - discover_schema - sync - - check - - discover - - spec + - reset_connection + - connection_updater + - replicate ResourceRequirements: description: optional resource requirements to run workers (blank for unbounded allocations) type: object diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index fc01581c5bd1d..e62bb1d48cd36 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -2579,6 +2579,29 @@

Return type

Example data

Content-Type: application/json
{
+  "resourceRequirements" : {
+    "default" : {
+      "cpu_limit" : "cpu_limit",
+      "memory_request" : "memory_request",
+      "memory_limit" : "memory_limit",
+      "cpu_request" : "cpu_request"
+    },
+    "jobSpecific" : [ {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    }, {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    } ]
+  },
   "documentationUrl" : "https://openapi-generator.tech",
   "dockerImageTag" : "dockerImageTag",
   "releaseDate" : "2000-01-23",
@@ -2688,6 +2711,29 @@ 

Return type

Example data

Content-Type: application/json
{
+  "resourceRequirements" : {
+    "default" : {
+      "cpu_limit" : "cpu_limit",
+      "memory_request" : "memory_request",
+      "memory_limit" : "memory_limit",
+      "cpu_request" : "cpu_request"
+    },
+    "jobSpecific" : [ {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    }, {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    } ]
+  },
   "documentationUrl" : "https://openapi-generator.tech",
   "dockerImageTag" : "dockerImageTag",
   "releaseDate" : "2000-01-23",
@@ -2741,6 +2787,29 @@ 

Example data

Content-Type: application/json
{
   "destinationDefinitions" : [ {
+    "resourceRequirements" : {
+      "default" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      },
+      "jobSpecific" : [ {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      }, {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      } ]
+    },
     "documentationUrl" : "https://openapi-generator.tech",
     "dockerImageTag" : "dockerImageTag",
     "releaseDate" : "2000-01-23",
@@ -2749,6 +2818,29 @@ 

Example data

"icon" : "icon", "destinationDefinitionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { + "resourceRequirements" : { + "default" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + }, + "jobSpecific" : [ { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + }, { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + } ] + }, "documentationUrl" : "https://openapi-generator.tech", "dockerImageTag" : "dockerImageTag", "releaseDate" : "2000-01-23", @@ -2797,6 +2889,29 @@

Example data

Content-Type: application/json
{
   "destinationDefinitions" : [ {
+    "resourceRequirements" : {
+      "default" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      },
+      "jobSpecific" : [ {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      }, {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      } ]
+    },
     "documentationUrl" : "https://openapi-generator.tech",
     "dockerImageTag" : "dockerImageTag",
     "releaseDate" : "2000-01-23",
@@ -2805,6 +2920,29 @@ 

Example data

"icon" : "icon", "destinationDefinitionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { + "resourceRequirements" : { + "default" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + }, + "jobSpecific" : [ { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + }, { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + } ] + }, "documentationUrl" : "https://openapi-generator.tech", "dockerImageTag" : "dockerImageTag", "releaseDate" : "2000-01-23", @@ -2864,6 +3002,29 @@

Return type

Example data

Content-Type: application/json
{
+  "resourceRequirements" : {
+    "default" : {
+      "cpu_limit" : "cpu_limit",
+      "memory_request" : "memory_request",
+      "memory_limit" : "memory_limit",
+      "cpu_request" : "cpu_request"
+    },
+    "jobSpecific" : [ {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    }, {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    } ]
+  },
   "documentationUrl" : "https://openapi-generator.tech",
   "dockerImageTag" : "dockerImageTag",
   "releaseDate" : "2000-01-23",
@@ -3227,6 +3388,29 @@ 

Example data

"job" : { "configId" : "configId", "sourceDefinition" : { + "resourceRequirements" : { + "default" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + }, + "jobSpecific" : [ { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + }, { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + } ] + }, "documentationUrl" : "https://openapi-generator.tech", "dockerImageTag" : "dockerImageTag", "releaseDate" : "2000-01-23", @@ -3238,6 +3422,29 @@

Example data

"airbyteVersion" : "airbyteVersion", "id" : 0, "destinationDefinition" : { + "resourceRequirements" : { + "default" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + }, + "jobSpecific" : [ { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + }, { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + } ] + }, "documentationUrl" : "https://openapi-generator.tech", "dockerImageTag" : "dockerImageTag", "releaseDate" : "2000-01-23", @@ -5609,6 +5816,29 @@

Return type

Example data

Content-Type: application/json
{
+  "resourceRequirements" : {
+    "default" : {
+      "cpu_limit" : "cpu_limit",
+      "memory_request" : "memory_request",
+      "memory_limit" : "memory_limit",
+      "cpu_request" : "cpu_request"
+    },
+    "jobSpecific" : [ {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    }, {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    } ]
+  },
   "documentationUrl" : "https://openapi-generator.tech",
   "dockerImageTag" : "dockerImageTag",
   "releaseDate" : "2000-01-23",
@@ -5718,6 +5948,29 @@ 

Return type

Example data

Content-Type: application/json
{
+  "resourceRequirements" : {
+    "default" : {
+      "cpu_limit" : "cpu_limit",
+      "memory_request" : "memory_request",
+      "memory_limit" : "memory_limit",
+      "cpu_request" : "cpu_request"
+    },
+    "jobSpecific" : [ {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    }, {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    } ]
+  },
   "documentationUrl" : "https://openapi-generator.tech",
   "dockerImageTag" : "dockerImageTag",
   "releaseDate" : "2000-01-23",
@@ -5771,6 +6024,29 @@ 

Example data

Content-Type: application/json
{
   "sourceDefinitions" : [ {
+    "resourceRequirements" : {
+      "default" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      },
+      "jobSpecific" : [ {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      }, {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      } ]
+    },
     "documentationUrl" : "https://openapi-generator.tech",
     "dockerImageTag" : "dockerImageTag",
     "releaseDate" : "2000-01-23",
@@ -5779,6 +6055,29 @@ 

Example data

"icon" : "icon", "sourceDefinitionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { + "resourceRequirements" : { + "default" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + }, + "jobSpecific" : [ { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + }, { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + } ] + }, "documentationUrl" : "https://openapi-generator.tech", "dockerImageTag" : "dockerImageTag", "releaseDate" : "2000-01-23", @@ -5827,6 +6126,29 @@

Example data

Content-Type: application/json
{
   "sourceDefinitions" : [ {
+    "resourceRequirements" : {
+      "default" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      },
+      "jobSpecific" : [ {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      }, {
+        "resourceRequirements" : {
+          "cpu_limit" : "cpu_limit",
+          "memory_request" : "memory_request",
+          "memory_limit" : "memory_limit",
+          "cpu_request" : "cpu_request"
+        }
+      } ]
+    },
     "documentationUrl" : "https://openapi-generator.tech",
     "dockerImageTag" : "dockerImageTag",
     "releaseDate" : "2000-01-23",
@@ -5835,6 +6157,29 @@ 

Example data

"icon" : "icon", "sourceDefinitionId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { + "resourceRequirements" : { + "default" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + }, + "jobSpecific" : [ { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + }, { + "resourceRequirements" : { + "cpu_limit" : "cpu_limit", + "memory_request" : "memory_request", + "memory_limit" : "memory_limit", + "cpu_request" : "cpu_request" + } + } ] + }, "documentationUrl" : "https://openapi-generator.tech", "dockerImageTag" : "dockerImageTag", "releaseDate" : "2000-01-23", @@ -5894,6 +6239,29 @@

Return type

Example data

Content-Type: application/json
{
+  "resourceRequirements" : {
+    "default" : {
+      "cpu_limit" : "cpu_limit",
+      "memory_request" : "memory_request",
+      "memory_limit" : "memory_limit",
+      "cpu_request" : "cpu_request"
+    },
+    "jobSpecific" : [ {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    }, {
+      "resourceRequirements" : {
+        "cpu_limit" : "cpu_limit",
+        "memory_request" : "memory_request",
+        "memory_limit" : "memory_limit",
+        "cpu_request" : "cpu_request"
+      }
+    } ]
+  },
   "documentationUrl" : "https://openapi-generator.tech",
   "dockerImageTag" : "dockerImageTag",
   "releaseDate" : "2000-01-23",
@@ -7880,6 +8248,7 @@ 

Models

Table of Contents

    +
  1. ActorDefinitionResourceRequirements -
  2. AdvancedAuth -
  3. AirbyteCatalog -
  4. AirbyteStream -
  5. @@ -7943,6 +8312,8 @@

    Table of Contents

  6. JobRead -
  7. JobReadList -
  8. JobStatus -
  9. +
  10. JobType -
  11. +
  12. JobTypeResourceLimit -
  13. JobWithAttemptsRead -
  14. KnownExceptionInfo -
  15. LogRead -
  16. @@ -8006,6 +8377,14 @@

    Table of Contents

  17. WorkspaceUpdateName -
+
+

ActorDefinitionResourceRequirements - Up

+
actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
+
+
default (optional)
+
jobSpecific (optional)
+
+
documentationUrl
URI format: uri
icon (optional)
+
resourceRequirements (optional)
@@ -8392,6 +8772,7 @@

DestinationDefinitionRead -
icon (optional)
releaseStage (optional)
releaseDate (optional)
date The date when this connector was first released, in yyyy-mm-dd format. format: date
+
resourceRequirements (optional)

@@ -8422,6 +8803,7 @@

DestinationDefinitionUpdate
destinationDefinitionId
UUID format: uuid
dockerImageTag (optional)
+
resourceRequirements (optional)

@@ -8607,6 +8989,20 @@

JobStatus -

+
+

JobType - Up

+
enum that describes the different types of jobs that the platform runs.
+
+
+
+
+

JobTypeResourceLimit - Up

+
sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
+
+
jobType (optional)
+
resourceRequirements (optional)
+
+

JobWithAttemptsRead - Up

@@ -8875,6 +9271,7 @@

SourceDefinitionCreate -
dockerImageTag
documentationUrl
URI format: uri
icon (optional)
+
resourceRequirements (optional)

releaseStage (optional)
releaseDate (optional)
date The date when this connector was first released, in yyyy-mm-dd format. format: date
+
resourceRequirements (optional)
@@ -8923,6 +9321,7 @@

SourceDefinitionUpdate -
sourceDefinitionId
UUID format: uuid
dockerImageTag
+
resourceRequirements (optional)

From 39a5f43772b2be820c57d1b95f8f0ebd0304a04c Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 22 Feb 2022 20:29:13 -0800 Subject: [PATCH 4/7] validate inputs --- airbyte-api/src/main/openapi/config.yaml | 6 +++++- .../init/YamlSeedConfigPersistence.java | 10 ++++++++-- .../seed/destination_definitions.yaml | 13 +++++++++++++ .../test/resources/invalid_definitions.yaml | 19 +++++++++++++++++++ .../config/AirbyteConfigValidator.java | 2 ++ .../ActorDefinitionResourceRequirements.yaml | 7 +++++-- .../resources/types/ResourceRequirements.yaml | 2 +- .../api/generated-api-html/index.html | 4 ++-- 8 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 airbyte-config/init/src/test/resources/invalid_definitions.yaml diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 1a31229c73f54..75cdcf81d3500 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3490,7 +3490,7 @@ components: ActorDefinitionResourceRequirements: description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level. type: object - additionalProperties: true + additionalProperties: false properties: default: "$ref": "#/definitions/ResourceRequirements" @@ -3501,6 +3501,10 @@ components: JobTypeResourceLimit: description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set. type: object + additionalProperties: false + required: + - jobType + - resourceRequirements properties: jobType: "$ref": "#/definitions/JobType" diff --git a/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java b/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java index 9c6d32d9e0a4c..0183846766abe 100644 --- a/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java +++ b/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; import io.airbyte.commons.docker.DockerUtils; @@ -14,6 +15,7 @@ import io.airbyte.commons.util.MoreIterators; import io.airbyte.commons.yaml.Yamls; import io.airbyte.config.AirbyteConfig; +import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigWithMetadata; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -57,7 +59,9 @@ private YamlSeedConfigPersistence(final Class seedResourceClass) throws IOExc final Map fullSourceDefinitionConfigs = sourceDefinitionConfigs.entrySet().stream() .collect(Collectors.toMap(Entry::getKey, e -> { final JsonNode withTombstone = addMissingTombstoneField(e.getValue()); - return mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs); + final JsonNode output = mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs); + AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_SOURCE_DEFINITION, output); + return output; })); final Map destinationDefinitionConfigs = getConfigs(seedResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION); @@ -65,7 +69,9 @@ private YamlSeedConfigPersistence(final Class seedResourceClass) throws IOExc final Map fullDestinationDefinitionConfigs = destinationDefinitionConfigs.entrySet().stream() .collect(Collectors.toMap(Entry::getKey, e -> { final JsonNode withTombstone = addMissingTombstoneField(e.getValue()); - return mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs); + final JsonNode output = mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs); + AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_DESTINATION_DEFINITION, output); + return output; })); this.allSeedConfigs = ImmutableMap.>builder() diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 207c1c88fe9ee..12499cb1e79bd 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -188,6 +188,19 @@ dockerImageTag: 0.4.15 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg + resourceRequirements: + default: + cpu_limit: "1.0" + cpu_request: "1.0" + memory_limit: "300Mi" + memory_request: "300Mi" + jobSpecific: + - jobType: sync + resourceRequirements: + cpu_limit: "0.5" + cpu_request: "0.5" + memory_limit: "600Mi" + memory_request: "600Mi" - name: MariaDB ColumnStore destinationDefinitionId: 294a4790-429b-40ae-9516-49826b9702e1 dockerRepository: airbyte/destination-mariadb-columnstore diff --git a/airbyte-config/init/src/test/resources/invalid_definitions.yaml b/airbyte-config/init/src/test/resources/invalid_definitions.yaml new file mode 100644 index 0000000000000..427fcfe23693d --- /dev/null +++ b/airbyte-config/init/src/test/resources/invalid_definitions.yaml @@ -0,0 +1,19 @@ +- name: Snowflake + destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba + dockerRepository: airbyte/destination-snowflake + dockerImageTag: 0.4.15 + documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake + icon: snowflake.svg + resourceRequirements: + default: + cpu_limit: "1.0" + cpu_request: "1.0" + memory_limit: "300Mi" + memory_request: "300Mi" + jobSpecific: + - jobType: sync + resourceRequirementsTypo: + cpu_limit: "0.5" + cpu_request: "0.5" + memory_limit: "600Mi" + memory_request: "600Mi" diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfigValidator.java b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfigValidator.java index 4acd68d4fd1b0..d587d6ddde4a4 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfigValidator.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfigValidator.java @@ -9,6 +9,8 @@ public class AirbyteConfigValidator extends AbstractSchemaValidator { + public static AirbyteConfigValidator AIRBYTE_CONFIG_VALIDATOR = new AirbyteConfigValidator(); + @Override public Path getSchemaPath(final ConfigSchema configType) { return configType.getConfigSchemaFile().toPath(); diff --git a/airbyte-config/models/src/main/resources/types/ActorDefinitionResourceRequirements.yaml b/airbyte-config/models/src/main/resources/types/ActorDefinitionResourceRequirements.yaml index 2d6c4b8c26853..904d6af92683b 100644 --- a/airbyte-config/models/src/main/resources/types/ActorDefinitionResourceRequirements.yaml +++ b/airbyte-config/models/src/main/resources/types/ActorDefinitionResourceRequirements.yaml @@ -4,7 +4,8 @@ title: ActorDefinitionResourceRequirements description: actor definition specific resource requirements type: object -additionalProperties: true +# set to false because we need the validations on seeds to be strict. otherwise, we will just add whatever is in the seed file into the db. +additionalProperties: false properties: default: description: if set, these are the requirements that should be set for ALL jobs run for this actor definition. @@ -17,8 +18,10 @@ definitions: JobTypeResourceLimit: description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set. type: object + # set to false because we need the validations on seeds to be strict. otherwise, we will just add whatever is in the seed file into the db. + additionalProperties: false required: - - jobtype + - jobType - resourceRequirements properties: jobType: diff --git a/airbyte-config/models/src/main/resources/types/ResourceRequirements.yaml b/airbyte-config/models/src/main/resources/types/ResourceRequirements.yaml index 68fdb68eb1089..26f416eba2f82 100644 --- a/airbyte-config/models/src/main/resources/types/ResourceRequirements.yaml +++ b/airbyte-config/models/src/main/resources/types/ResourceRequirements.yaml @@ -4,7 +4,7 @@ title: ResourceRequirements description: generic configuration for pod source requirements type: object -additionalProperties: true +additionalProperties: false properties: # todo (cgardens) - should be camel case for consistency. cpu_request: diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index e62bb1d48cd36..9b44f61746004 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -8999,8 +8999,8 @@

JobType - JobTypeResourceLimit - Up

sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
-
jobType (optional)
-
resourceRequirements (optional)
+
jobType
+
resourceRequirements
From df4a429948ae3c3f4f80822d377884aeb9f49b59 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 22 Feb 2022 20:30:04 -0800 Subject: [PATCH 5/7] clena --- .../java/io/airbyte/config/init/YamlSeedConfigPersistence.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java b/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java index 0183846766abe..0603f6f323ac8 100644 --- a/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java +++ b/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; import io.airbyte.commons.docker.DockerUtils; From 138699fbc133b768f95281e760fee329f91a5e6e Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 23 Feb 2022 11:07:01 -0800 Subject: [PATCH 6/7] remove test file --- .../test/resources/invalid_definitions.yaml | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 airbyte-config/init/src/test/resources/invalid_definitions.yaml diff --git a/airbyte-config/init/src/test/resources/invalid_definitions.yaml b/airbyte-config/init/src/test/resources/invalid_definitions.yaml deleted file mode 100644 index 427fcfe23693d..0000000000000 --- a/airbyte-config/init/src/test/resources/invalid_definitions.yaml +++ /dev/null @@ -1,19 +0,0 @@ -- name: Snowflake - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.15 - documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - icon: snowflake.svg - resourceRequirements: - default: - cpu_limit: "1.0" - cpu_request: "1.0" - memory_limit: "300Mi" - memory_request: "300Mi" - jobSpecific: - - jobType: sync - resourceRequirementsTypo: - cpu_limit: "0.5" - cpu_request: "0.5" - memory_limit: "600Mi" - memory_request: "600Mi" From 22f2acd8ddde768c1e82b7e08ab3083b7c419360 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 23 Feb 2022 14:21:42 -0800 Subject: [PATCH 7/7] add comments explaining resource requests --- .../init/src/main/resources/seed/destination_definitions.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 12499cb1e79bd..f88a92fd6917b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -189,12 +189,15 @@ documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg resourceRequirements: + # this is our first example of setting these requirements. they are guesses, not data driven. + # setting default cpu higher, because we have found that check and discover can be cpu constrained for dbs. default: cpu_limit: "1.0" cpu_request: "1.0" memory_limit: "300Mi" memory_request: "300Mi" jobSpecific: + # sync jobs are generally IO and memory bound and not cpu. - jobType: sync resourceRequirements: cpu_limit: "0.5"