Skip to content

Commit 083b5d1

Browse files
New connector: Destination ElasticSearch strict encrypt (#16862)
1 parent 7f2c8be commit 083b5d1

File tree

9 files changed

+351
-1
lines changed

9 files changed

+351
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*
2+
!Dockerfile
3+
!build
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM airbyte/integration-base-java:dev AS build
2+
3+
WORKDIR /airbyte
4+
5+
ENV APPLICATION destination-elasticsearch-strict-encrypt
6+
7+
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
8+
9+
RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar
10+
11+
FROM airbyte/integration-base-java:dev
12+
13+
WORKDIR /airbyte
14+
15+
ENV APPLICATION destination-elasticsearch-strict-encrypt
16+
17+
COPY --from=build /airbyte /airbyte
18+
19+
LABEL io.airbyte.version=0.1.3
20+
LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Elasticsearch Strict Encrypt Test Configuration
2+
3+
In order to test the Elasticsearch destination, you need to have the up and running Elasticsearch that has xpack.security.enabled.
4+
5+
This connector inherits the Elasticsearch destination, but support authorized connections only.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
plugins {
2+
id 'application'
3+
id 'airbyte-docker'
4+
id 'airbyte-integration-test-java'
5+
}
6+
7+
application {
8+
mainClass = 'io.airbyte.integrations.destination.elasticsearch.ElasticsearchStrictEncryptDestination'
9+
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
10+
}
11+
12+
dependencies {
13+
implementation project(':airbyte-config:config-models')
14+
implementation project(':airbyte-protocol:protocol-models')
15+
implementation project(':airbyte-integrations:bases:base-java')
16+
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
17+
18+
implementation 'co.elastic.clients:elasticsearch-java:7.15.0'
19+
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
20+
21+
// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
22+
// https://eclipse-ee4j.github.io/jsonp/
23+
implementation 'jakarta.json:jakarta.json-api:2.0.1'
24+
25+
// Needed even if using Jackson to have an implementation of the Jsonp object model
26+
// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
27+
// https://github.com/eclipse-ee4j/jsonp
28+
implementation 'org.glassfish:jakarta.json:2.0.1'
29+
implementation project(':airbyte-integrations:connectors:destination-elasticsearch')
30+
31+
// MIT
32+
// https://www.testcontainers.org/
33+
testImplementation libs.connectors.testcontainers.elasticsearch
34+
integrationTestJavaImplementation libs.connectors.testcontainers.elasticsearch
35+
36+
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
37+
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-elasticsearch')
38+
}
39+
40+
repositories {
41+
maven {
42+
name = "ESSnapshots"
43+
url = "https://snapshots.elastic.co/maven/"
44+
}
45+
maven {
46+
name = "ESJavaGithubPackages"
47+
url = "https://maven.pkg.github.com/elastic/elasticsearch-java"
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.elasticsearch;
6+
7+
import com.fasterxml.jackson.databind.node.ArrayNode;
8+
import io.airbyte.commons.json.Jsons;
9+
import io.airbyte.integrations.base.Destination;
10+
import io.airbyte.integrations.base.IntegrationRunner;
11+
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
12+
import io.airbyte.protocol.models.ConnectorSpecification;
13+
import java.util.stream.IntStream;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
public class ElasticsearchStrictEncryptDestination extends SpecModifyingDestination implements Destination {
18+
19+
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchStrictEncryptDestination.class);
20+
21+
public ElasticsearchStrictEncryptDestination() {
22+
super(new ElasticsearchDestination());
23+
}
24+
25+
public static void main(String[] args) throws Exception {
26+
final var destination = new ElasticsearchStrictEncryptDestination();
27+
LOGGER.info("starting destination: {}", ElasticsearchStrictEncryptDestination.class);
28+
new IntegrationRunner(destination).run(args);
29+
LOGGER.info("completed destination: {}", ElasticsearchStrictEncryptDestination.class);
30+
}
31+
32+
@Override
33+
public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception {
34+
final ConnectorSpecification spec = Jsons.clone(originalSpec);
35+
ArrayNode authMethod = (ArrayNode) spec.getConnectionSpecification().get("properties").get("authenticationMethod").get("oneOf");
36+
IntStream.range(0, authMethod.size()).filter(i -> authMethod.get(i).get("title").asText().equals("None")).findFirst()
37+
.ifPresent(authMethod::remove);
38+
return spec;
39+
}
40+
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.elasticsearch;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import com.google.common.collect.ImmutableMap;
10+
import io.airbyte.commons.json.Jsons;
11+
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
12+
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
13+
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
14+
import java.io.IOException;
15+
import java.util.List;
16+
import java.util.Map;
17+
import org.junit.jupiter.api.AfterAll;
18+
import org.junit.jupiter.api.BeforeAll;
19+
import org.testcontainers.elasticsearch.ElasticsearchContainer;
20+
21+
public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {
22+
23+
private final ObjectMapper mapper = new ObjectMapper();
24+
private static ElasticsearchContainer container;
25+
26+
@BeforeAll
27+
public static void beforeAll() {
28+
29+
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1")
30+
.withPassword("MagicWord");
31+
32+
container.start();
33+
}
34+
35+
@AfterAll
36+
public static void afterAll() {
37+
container.stop();
38+
container.close();
39+
}
40+
41+
@Override
42+
protected String getImageName() {
43+
return "airbyte/destination-elasticsearch-strict-encrypt:dev";
44+
}
45+
46+
@Override
47+
protected int getMaxRecordValueLimit() {
48+
return 2000000;
49+
}
50+
51+
@Override
52+
protected boolean implementsNamespaces() {
53+
return true;
54+
}
55+
56+
@Override
57+
protected boolean supportsNormalization() {
58+
return false;
59+
}
60+
61+
@Override
62+
protected boolean supportBasicDataTypeTest() {
63+
return true;
64+
}
65+
66+
@Override
67+
protected boolean supportArrayDataTypeTest() {
68+
return false;
69+
}
70+
71+
@Override
72+
protected boolean supportObjectDataTypeTest() {
73+
return true;
74+
}
75+
76+
@Override
77+
protected TestDataComparator getTestDataComparator() {
78+
return new AdvancedTestDataComparator();
79+
}
80+
81+
@Override
82+
protected JsonNode getConfig() {
83+
84+
final JsonNode authConfig = Jsons.jsonNode(Map.of(
85+
"method", "basic",
86+
"username", "elastic",
87+
"password", "MagicWord"));
88+
89+
return Jsons.jsonNode(ImmutableMap.builder()
90+
.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200)))
91+
.put("authenticationMethod", authConfig)
92+
.build());
93+
}
94+
95+
@Override
96+
protected JsonNode getFailCheckConfig() {
97+
// should result in a failed connection check
98+
return mapper.createObjectNode();
99+
}
100+
101+
@Override
102+
protected List<JsonNode> retrieveRecords(DestinationAcceptanceTest.TestDestinationEnv testEnv,
103+
String streamName,
104+
String namespace,
105+
JsonNode streamSchema)
106+
throws IOException {
107+
// Records returned from this method will be compared against records provided to the connector
108+
// to verify they were written correctly
109+
final String indexName = new ElasticsearchWriteConfig()
110+
.setNamespace(namespace)
111+
.setStreamName(streamName)
112+
.getIndexName();
113+
114+
ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class));
115+
return connection.getRecords(indexName);
116+
}
117+
118+
@Override
119+
protected void setup(DestinationAcceptanceTest.TestDestinationEnv testEnv) {}
120+
121+
@Override
122+
protected void tearDown(DestinationAcceptanceTest.TestDestinationEnv testEnv) {
123+
ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class));
124+
connection.allIndices().forEach(connection::deleteIndexIfPresent);
125+
}
126+
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.elasticsearch;
6+
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
9+
import io.airbyte.commons.json.Jsons;
10+
import io.airbyte.commons.resources.MoreResources;
11+
import io.airbyte.protocol.models.ConnectorSpecification;
12+
import org.junit.jupiter.api.Test;
13+
14+
public class ElasticsearchDestinationStrictEncryptTest {
15+
16+
@Test
17+
void testSpec() throws Exception {
18+
final ConnectorSpecification actual = new ElasticsearchStrictEncryptDestination().spec();
19+
final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class);
20+
21+
assertEquals(expected, actual);
22+
}
23+
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
{
2+
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/elasticsearch",
3+
"supportsIncremental": true,
4+
"supportsNormalization": false,
5+
"supportsNamespaces": true,
6+
"supportsDBT": false,
7+
"supported_destination_sync_modes": ["overwrite", "append"],
8+
"connectionSpecification": {
9+
"$schema": "http://json-schema.org/draft-07/schema#",
10+
"title": "Elasticsearch Connection Configuration",
11+
"type": "object",
12+
"required": ["endpoint"],
13+
"additionalProperties": false,
14+
"properties": {
15+
"endpoint": {
16+
"title": "Server Endpoint",
17+
"type": "string",
18+
"description": "The full url of the Elasticsearch server"
19+
},
20+
"upsert": {
21+
"type": "boolean",
22+
"title": "Upsert Records",
23+
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
24+
"default": true
25+
},
26+
"authenticationMethod": {
27+
"title": "Authentication Method",
28+
"type": "object",
29+
"description": "The type of authentication to be used",
30+
"oneOf": [
31+
{
32+
"title": "Api Key/Secret",
33+
"additionalProperties": false,
34+
"description": "Use a api key and secret combination to authenticate",
35+
"required": ["method", "apiKeyId", "apiKeySecret"],
36+
"properties": {
37+
"method": {
38+
"type": "string",
39+
"const": "secret"
40+
},
41+
"apiKeyId": {
42+
"title": "API Key ID",
43+
"description": "The Key ID to used when accessing an enterprise Elasticsearch instance.",
44+
"type": "string"
45+
},
46+
"apiKeySecret": {
47+
"title": "API Key Secret",
48+
"description": "The secret associated with the API Key ID.",
49+
"type": "string",
50+
"airbyte_secret": true
51+
}
52+
}
53+
},
54+
{
55+
"title": "Username/Password",
56+
"additionalProperties": false,
57+
"description": "Basic auth header with a username and password",
58+
"required": ["method", "username", "password"],
59+
"properties": {
60+
"method": {
61+
"type": "string",
62+
"const": "basic"
63+
},
64+
"username": {
65+
"title": "Username",
66+
"description": "Basic auth username to access a secure Elasticsearch server",
67+
"type": "string"
68+
},
69+
"password": {
70+
"title": "Password",
71+
"description": "Basic auth password to access a secure Elasticsearch server",
72+
"type": "string",
73+
"airbyte_secret": true
74+
}
75+
}
76+
}
77+
]
78+
}
79+
}
80+
}
81+
}

airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class ElasticsearchWriteConfig {
1919
private List<List<String>> primaryKey;
2020
private boolean upsert;
2121

22-
ElasticsearchWriteConfig() {}
22+
public ElasticsearchWriteConfig() {}
2323

2424
ElasticsearchWriteConfig(
2525
String namespace,

0 commit comments

Comments
 (0)