Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to inject environment variables globally into launched processes #9329

Merged
merged 9 commits into from
Jan 10, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ public interface Configs {
*/
String getJobMainContainerMemoryLimit();

/**
* Defines a default map of environment variables to use for any launched job containers. The
* expected format is a JSON encoded String -> String map. Make sure to escape properly. Defaults to
* an empty map.
*/
Map<String, String> getJobDefaultEnvMap();

// Jobs - Kube only
/**
* Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class EnvConfigs implements Configs {
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
public static final String JOB_MAIN_CONTAINER_MEMORY_LIMIT = "JOB_MAIN_CONTAINER_MEMORY_LIMIT";
public static final String JOB_DEFAULT_ENV_MAP = "JOB_DEFAULT_ENV_MAP";
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET";
private static final String PUBLISH_METRICS = "PUBLISH_METRICS";
Expand Down Expand Up @@ -481,6 +483,11 @@ public String getJobMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
public Map<String, String> getJobDefaultEnvMap() {
return (Map<String, String>) Jsons.deserialize(getEnvOrDefault(JOB_DEFAULT_ENV_MAP, Jsons.serialize(Map.of())), Map.class);
}

@Override
public LogConfigs getLogConfigs() {
return logConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import java.nio.file.Paths;
import java.util.List;
Expand Down Expand Up @@ -204,4 +205,14 @@ void testworkerKubeNodeSelectors() {
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testEnvMapRetrieval() {
when(function.apply(EnvConfigs.JOB_DEFAULT_ENV_MAP)).thenReturn(Jsons.serialize(Map.of()));
Assertions.assertEquals(Map.of(), config.getJobDefaultEnvMap());

final var map = Map.of("ENV1", "VAL1", "ENV2", "VAL\"2WithQuotesand$ymbols");
when(function.apply(EnvConfigs.JOB_DEFAULT_ENV_MAP)).thenReturn(Jsons.serialize(map));
Assertions.assertEquals(map, config.getJobDefaultEnvMap());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class WorkerConfigs {
private final String jobSocatImage;
private final String jobBusyboxImage;
private final String jobCurlImage;
private final Map<String, String> envMap;

public WorkerConfigs(final Configs configs) {
this.workerEnvironment = configs.getWorkerEnvironment();
Expand All @@ -36,6 +37,7 @@ public WorkerConfigs(final Configs configs) {
this.jobSocatImage = configs.getJobKubeSocatImage();
this.jobBusyboxImage = configs.getJobKubeBusyboxImage();
this.jobCurlImage = configs.getJobKubeCurlImage();
this.envMap = configs.getJobDefaultEnvMap();
}

public Configs.WorkerEnvironment getWorkerEnvironment() {
Expand Down Expand Up @@ -74,4 +76,8 @@ public String getJobCurlImage() {
return jobCurlImage;
}

public Map<String, String> getEnvMap() {
return envMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public Process create(final String jobId,
"--log-driver",
"none");
}

for (final var envEntry : workerConfigs.getEnvMap().entrySet()) {
cmd.add("-e");
cmd.add(envEntry.getKey() + "=" + envEntry.getValue());
}

if (!Strings.isNullOrEmpty(entrypoint)) {
cmd.add("--entrypoint");
cmd.add(entrypoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
Expand Down Expand Up @@ -134,7 +135,6 @@ public class KubePodProcess extends Process {
private final Duration statusCheckInterval;
private final int stdoutLocalPort;
private final ServerSocket stderrServerSocket;
private final Map<Integer, Integer> internalToExternalPorts;
private final int stderrLocalPort;
private final ExecutorService executorService;

Expand Down Expand Up @@ -173,6 +173,7 @@ private static Container getMain(final String image,
final List<VolumeMount> mainVolumeMounts,
final ResourceRequirements resourceRequirements,
final Map<Integer, Integer> internalToExternalPorts,
final Map<String, String> envMap,
final String[] args)
throws IOException {
final var argsStr = String.join(" ", args);
Expand All @@ -196,12 +197,17 @@ private static Container getMain(final String image,
.build())
.collect(Collectors.toList());

final List<EnvVar> envVars = envMap.entrySet().stream()
.map(entry -> new EnvVar(entry.getKey(), entry.getValue(), null))
.collect(Collectors.toList());

final ContainerBuilder containerBuilder = new ContainerBuilder()
.withName("main")
.withPorts(containerPorts)
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withCommand("sh", "-c", mainCommand)
.withEnv(envVars)
.withWorkingDir(CONFIG_DIR)
.withVolumeMounts(mainVolumeMounts);

Expand Down Expand Up @@ -333,17 +339,16 @@ public KubePodProcess(final boolean isOrchestrator,
final String socatImage,
final String busyboxImage,
final String curlImage,
final Map<String, String> envMap,
final Map<Integer, Integer> internalToExternalPorts,
final String... args)
throws IOException, InterruptedException {
this.fabricClient = fabricClient;
this.statusCheckInterval = statusCheckInterval;
this.stdoutLocalPort = stdoutLocalPort;
this.stderrLocalPort = stderrLocalPort;

this.stdoutServerSocket = new ServerSocket(stdoutLocalPort);
this.stderrServerSocket = new ServerSocket(stderrLocalPort);
this.internalToExternalPorts = internalToExternalPorts;
this.executorService = Executors.newFixedThreadPool(2);
setupStdOutAndStdErrListeners();

Expand Down Expand Up @@ -394,6 +399,7 @@ public KubePodProcess(final boolean isOrchestrator,
List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount),
resourceRequirements,
internalToExternalPorts,
envMap,
args);

final io.fabric8.kubernetes.api.model.ResourceRequirements sidecarResources = getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public Process create(final String jobId,
workerConfigs.getJobSocatImage(),
workerConfigs.getJobBusyboxImage(),
workerConfigs.getJobCurlImage(),
workerConfigs.getEnvMap(),
internalToExternalPorts,
args);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class OrchestratorConstants {
EnvConfigs.JOB_MAIN_CONTAINER_CPU_LIMIT,
EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_REQUEST,
EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT,
EnvConfigs.JOB_DEFAULT_ENV_MAP,
EnvConfigs.LOCAL_ROOT);

public static final String INIT_FILE_ENV_MAP = "envMap.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -55,6 +57,9 @@ public class KubePodProcessIntegrationTest {
private static KubernetesClient fabricClient;
private static KubeProcessFactory processFactory;
private static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = new WorkerConfigs(new EnvConfigs()).getResourceRequirements();
private static final String ENV_KEY = "ENV_VAR_1";
private static final String ENV_VALUE = "ENV_VALUE_1";
private static final Map<String, String> ENV_MAP = ImmutableMap.of(ENV_KEY, ENV_VALUE);

private WorkerHeartbeatServer server;

Expand All @@ -69,8 +74,18 @@ public static void init() throws Exception {
fabricClient = new DefaultKubernetesClient();

KubePortManagerSingleton.init(new HashSet<>(openPorts.subList(1, openPorts.size() - 1)));

final WorkerConfigs workerConfigs = spy(new WorkerConfigs(new EnvConfigs()));
when(workerConfigs.getEnvMap()).thenReturn(Map.of("ENV_VAR_1", "ENV_VALUE_1"));

processFactory =
new KubeProcessFactory(new WorkerConfigs(new EnvConfigs()), "default", fabricClient, heartbeatUrl, getHost(), false,
new KubeProcessFactory(
workerConfigs,
"default",
fabricClient,
heartbeatUrl,
getHost(),
false,
Duration.ofSeconds(1));
}

Expand Down Expand Up @@ -193,6 +208,19 @@ public void testSuccessfulSpawningWithQuotes() throws Exception {
assertEquals(0, process.exitValue());
}

@Test
public void testEnvMapSet() throws Exception {
// start a finite process
final Process process = getProcess("echo ENV_VAR_1=$ENV_VAR_1");
final var output = new String(process.getInputStream().readAllBytes());
assertEquals("ENV_VAR_1=ENV_VALUE_1\n", output);
process.waitFor();

// the pod should be dead and in a good state
assertFalse(process.isAlive());
assertEquals(0, process.exitValue());
}

@Test
public void testPipeInEntrypoint() throws Exception {
// start a process that has a pipe in the entrypoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ void check() throws WorkerException {
launcher.check(JOB_ROOT, "config", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(), Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB), Map.of(),
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
Map.of(),
"check",
"--config", "config");
}
Expand All @@ -67,7 +69,9 @@ void discover() throws WorkerException {
launcher.discover(JOB_ROOT, "config", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(), Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB), Map.of(),
workerConfigs.getResourceRequirements(),
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
Map.of(),
"discover",
"--config", "config");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
Expand Down Expand Up @@ -87,4 +89,50 @@ public void testFileWriting(boolean isOrchestrator) throws IOException, WorkerEx
Jsons.deserialize(IOs.readFile(jobRoot, "config.json")));
}

/**
* Tests that the env var map passed in is accessible within the process.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEnvMapSet(boolean isOrchestrator) throws IOException, WorkerException {
final Path workspaceRoot = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "process_factory");
final Path jobRoot = workspaceRoot.resolve("job");

final WorkerConfigs workerConfigs = spy(new WorkerConfigs(new EnvConfigs()));
when(workerConfigs.getEnvMap()).thenReturn(Map.of("ENV_VAR_1", "ENV_VALUE_1"));

final DockerProcessFactory processFactory =
new DockerProcessFactory(
workerConfigs,
workspaceRoot,
"",
"",
"host",
isOrchestrator);

final Process process = processFactory.create(
"job_id",
0,
jobRoot,
"busybox",
false,
Map.of(),
"/bin/sh",
workerConfigs.getResourceRequirements(),
Map.of(),
Map.of(),
"-c",
"echo ENV_VAR_1=$ENV_VAR_1");

final StringBuilder out = new StringBuilder();
final StringBuilder err = new StringBuilder();
LineGobbler.gobble(process.getInputStream(), out::append);
LineGobbler.gobble(process.getErrorStream(), err::append);

WorkerUtils.gentleClose(new WorkerConfigs(new EnvConfigs()), process, 20, TimeUnit.SECONDS);

assertEquals(0, process.exitValue(), String.format("Process failed with stdout: %s and stderr: %s", out, err));
assertEquals("ENV_VAR_1=ENV_VALUE_1", out.toString(), String.format("Output did not contain the expected string. stdout: %s", out));
}

}