Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to inject environment variables globally into launched processes #9329

Merged
merged 9 commits into from
Jan 10, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ public interface Configs {
*/
String getJobMainContainerMemoryLimit();

/**
* Defines a default map of environment variables to use for any launched job containers. The
* expected format is a JSON encoded String -> String map. Make sure to escape properly. Defaults to
* an empty map.
*/
Map<String, String> getJobDefaultEnvMap();

// Jobs - Kube only
/**
* Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class EnvConfigs implements Configs {
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
public static final String JOB_MAIN_CONTAINER_MEMORY_LIMIT = "JOB_MAIN_CONTAINER_MEMORY_LIMIT";
public static final String JOB_DEFAULT_ENV_MAP = "JOB_DEFAULT_ENV_MAP";
public static final String JOB_DEFAULT_ENV_PREFIX = "JOB_DEFAULT_ENV_";
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET";
private static final String PUBLISH_METRICS = "PUBLISH_METRICS";
Expand Down Expand Up @@ -121,15 +124,24 @@ public class EnvConfigs implements Configs {
public static final String DEFAULT_NETWORK = "host";

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
private final LogConfigs logConfigs;
private final CloudStorageConfigs stateStorageCloudConfigs;

/**
* Constructs {@link EnvConfigs} from actual environment variables.
*/
public EnvConfigs() {
this(System::getenv);
this(System.getenv());
}

public EnvConfigs(final Function<String, String> getEnv) {
this.getEnv = getEnv;
/**
* Constructs {@link EnvConfigs} from a provided map. This can be used for testing or getting
* variables from a non-envvar source.
*/
public EnvConfigs(final Map<String, String> envMap) {
this.getEnv = envMap::get;
this.getAllEnvKeys = envMap::keySet;
this.logConfigs = new LogConfigs(getLogConfiguration().orElse(null));
this.stateStorageCloudConfigs = getStateStorageConfiguration().orElse(null);
}
Expand Down Expand Up @@ -481,6 +493,13 @@ public String getJobMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
public Map<String, String> getJobDefaultEnvMap() {
return getAllEnvKeys.get().stream()
.filter(key -> key.startsWith(JOB_DEFAULT_ENV_PREFIX))
.collect(Collectors.toMap(key -> key.replace(JOB_DEFAULT_ENV_PREFIX, ""), getEnv));
}

@Override
public LogConfigs getLogConfigs() {
return logConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,24 @@

package io.airbyte.config;

import static org.mockito.Mockito.when;

import io.airbyte.commons.version.AirbyteVersion;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class EnvConfigsTest {

private Function<String, String> function;
private Map<String, String> envMap;
private EnvConfigs config;

@SuppressWarnings("unchecked")
@BeforeEach
void setUp() {
function = Mockito.mock(Function.class);
config = new EnvConfigs(function);
envMap = new HashMap<>();
config = new EnvConfigs(envMap);
}

@Test
Expand All @@ -35,173 +31,187 @@ void ensureGetEnvBehavior() {

@Test
void testAirbyteRole() {
when(function.apply(EnvConfigs.AIRBYTE_ROLE)).thenReturn(null);
envMap.put(EnvConfigs.AIRBYTE_ROLE, null);
Assertions.assertNull(config.getAirbyteRole());

when(function.apply(EnvConfigs.AIRBYTE_ROLE)).thenReturn("dev");
envMap.put(EnvConfigs.AIRBYTE_ROLE, "dev");
Assertions.assertEquals("dev", config.getAirbyteRole());
}

@Test
void testAirbyteVersion() {
when(function.apply(EnvConfigs.AIRBYTE_VERSION)).thenReturn(null);
envMap.put(EnvConfigs.AIRBYTE_VERSION, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getAirbyteVersion());

when(function.apply(EnvConfigs.AIRBYTE_VERSION)).thenReturn("dev");
envMap.put(EnvConfigs.AIRBYTE_VERSION, "dev");
Assertions.assertEquals(new AirbyteVersion("dev"), config.getAirbyteVersion());
}

@Test
void testWorkspaceRoot() {
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.WORKSPACE_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceRoot());

when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.WORKSPACE_ROOT, "abc/def");
Assertions.assertEquals(Paths.get("abc/def"), config.getWorkspaceRoot());
}

@Test
void testLocalRoot() {
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.LOCAL_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getLocalRoot());

when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.LOCAL_ROOT, "abc/def");
Assertions.assertEquals(Paths.get("abc/def"), config.getLocalRoot());
}

@Test
void testConfigRoot() {
when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.CONFIG_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getConfigRoot());

when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn("a/b");
envMap.put(EnvConfigs.CONFIG_ROOT, "a/b");
Assertions.assertEquals(Paths.get("a/b"), config.getConfigRoot());
}

@Test
void testGetDatabaseUser() {
when(function.apply(EnvConfigs.DATABASE_USER)).thenReturn(null);
envMap.put(EnvConfigs.DATABASE_USER, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getDatabaseUser());

when(function.apply(EnvConfigs.DATABASE_USER)).thenReturn("user");
envMap.put(EnvConfigs.DATABASE_USER, "user");
Assertions.assertEquals("user", config.getDatabaseUser());
}

@Test
void testGetDatabasePassword() {
when(function.apply(EnvConfigs.DATABASE_PASSWORD)).thenReturn(null);
envMap.put(EnvConfigs.DATABASE_PASSWORD, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getDatabasePassword());

when(function.apply(EnvConfigs.DATABASE_PASSWORD)).thenReturn("password");
envMap.put(EnvConfigs.DATABASE_PASSWORD, "password");
Assertions.assertEquals("password", config.getDatabasePassword());
}

@Test
void testGetDatabaseUrl() {
when(function.apply(EnvConfigs.DATABASE_URL)).thenReturn(null);
envMap.put(EnvConfigs.DATABASE_URL, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getDatabaseUrl());

when(function.apply(EnvConfigs.DATABASE_URL)).thenReturn("url");
envMap.put(EnvConfigs.DATABASE_URL, "url");
Assertions.assertEquals("url", config.getDatabaseUrl());
}

@Test
void testGetWorkspaceDockerMount() {
when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.WORKSPACE_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.WORKSPACE_ROOT, "abc/def");
Assertions.assertEquals("abc/def", config.getWorkspaceDockerMount());

when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn("root");
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.WORKSPACE_DOCKER_MOUNT, "root");
envMap.put(EnvConfigs.WORKSPACE_ROOT, "abc/def");
Assertions.assertEquals("root", config.getWorkspaceDockerMount());

when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.WORKSPACE_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.WORKSPACE_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceDockerMount());
}

@Test
void testGetLocalDockerMount() {
when(function.apply(EnvConfigs.LOCAL_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.LOCAL_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.LOCAL_ROOT, "abc/def");
Assertions.assertEquals("abc/def", config.getLocalDockerMount());

when(function.apply(EnvConfigs.LOCAL_DOCKER_MOUNT)).thenReturn("root");
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.LOCAL_DOCKER_MOUNT, "root");
envMap.put(EnvConfigs.LOCAL_ROOT, "abc/def");
Assertions.assertEquals("root", config.getLocalDockerMount());

when(function.apply(EnvConfigs.LOCAL_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.LOCAL_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.LOCAL_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getLocalDockerMount());
}

@Test
void testDockerNetwork() {
when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn(null);
envMap.put(EnvConfigs.DOCKER_NETWORK, null);
Assertions.assertEquals("host", config.getDockerNetwork());

when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn("abc");
envMap.put(EnvConfigs.DOCKER_NETWORK, "abc");
Assertions.assertEquals("abc", config.getDockerNetwork());
}

@Test
void testTrackingStrategy() {
when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn(null);
envMap.put(EnvConfigs.TRACKING_STRATEGY, null);
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("abc");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "abc");
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("logging");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "logging");
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("segment");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "segment");
Assertions.assertEquals(Configs.TrackingStrategy.SEGMENT, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("LOGGING");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "LOGGING");
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());
}

@Test
void testworkerKubeTolerations() {
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn(null);
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, null);
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn(";;;");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, ";;;");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=k,value=v;");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, "key=k,value=v;");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, "key=airbyte-server,operator=Exists,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists")));

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, "key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS))
.thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule;key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS,
"key=airbyte-server,operator=Exists,effect=NoSchedule;key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(
new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists"),
new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));
}

@Test
void testworkerKubeNodeSelectors() {
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn(null);
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null);
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn(",,,");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, ",,,");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("key=k,,;$%&^#");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k"));

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("one=two");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "one=two");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two"));

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("airbyte=server,something=nothing");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testEmptyEnvMapRetrieval() {
Assertions.assertEquals(Map.of(), config.getJobDefaultEnvMap());
}

@Test
void testEnvMapRetrieval() {
envMap.put(EnvConfigs.JOB_DEFAULT_ENV_PREFIX + "ENV1", "VAL1");
envMap.put(EnvConfigs.JOB_DEFAULT_ENV_PREFIX + "ENV2", "VAL\"2WithQuotesand$ymbols");

final var expected = Map.of("ENV1", "VAL1", "ENV2", "VAL\"2WithQuotesand$ymbols");
Assertions.assertEquals(expected, config.getJobDefaultEnvMap());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void main(final String[] args) throws Exception {
final Map<String, String> envMap =
(Map<String, String>) Jsons.deserialize(Files.readString(Path.of(OrchestratorConstants.INIT_FILE_ENV_MAP)), Map.class);

final Configs configs = new EnvConfigs(envMap::get);
final Configs configs = new EnvConfigs(envMap);

heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
heartbeatServer.startBackground();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class WorkerConfigs {
private final String jobSocatImage;
private final String jobBusyboxImage;
private final String jobCurlImage;
private final Map<String, String> envMap;

public WorkerConfigs(final Configs configs) {
this.workerEnvironment = configs.getWorkerEnvironment();
Expand All @@ -36,6 +37,7 @@ public WorkerConfigs(final Configs configs) {
this.jobSocatImage = configs.getJobKubeSocatImage();
this.jobBusyboxImage = configs.getJobKubeBusyboxImage();
this.jobCurlImage = configs.getJobKubeCurlImage();
this.envMap = configs.getJobDefaultEnvMap();
}

public Configs.WorkerEnvironment getWorkerEnvironment() {
Expand Down Expand Up @@ -74,4 +76,8 @@ public String getJobCurlImage() {
return jobCurlImage;
}

public Map<String, String> getEnvMap() {
return envMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public Process create(final String jobId,
"--log-driver",
"none");
}

for (final var envEntry : workerConfigs.getEnvMap().entrySet()) {
cmd.add("-e");
cmd.add(envEntry.getKey() + "=" + envEntry.getValue());
}

if (!Strings.isNullOrEmpty(entrypoint)) {
cmd.add("--entrypoint");
cmd.add(entrypoint);
Expand Down
Loading