From ee5d897078a540cf6af05e8c15597d42f4e016fa Mon Sep 17 00:00:00 2001 From: jrthe42 Date: Fri, 25 Dec 2020 16:17:08 +0800 Subject: [PATCH 1/2] support routing field for es connector --- .../table/ElasticsearchConfiguration.java | 4 + .../table/ElasticsearchOptions.java | 5 ++ .../elasticsearch/table/RoutingExtractor.java | 65 +++++++++++++++ .../table/RowElasticsearchSinkFunction.java | 16 +++- .../flink-connector-elasticsearch6/pom.xml | 2 +- .../table/Elasticsearch6DynamicSink.java | 4 +- .../Elasticsearch6DynamicSinkFactory.java | 2 + .../Elasticsearch6DynamicSinkITCase.java | 79 +++++++++++++++++++ .../flink-connector-elasticsearch7/pom.xml | 2 +- .../table/Elasticsearch7DynamicSink.java | 4 +- .../Elasticsearch7DynamicSinkFactory.java | 2 + .../Elasticsearch7DynamicSinkITCase.java | 74 +++++++++++++++++ 12 files changed, 254 insertions(+), 5 deletions(-) create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RoutingExtractor.java diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index e1ffab778869f..40e96945d84fc 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -145,6 +145,10 @@ public String getKeyDelimiter() { return config.get(ElasticsearchOptions.KEY_DELIMITER_OPTION); } + public Optional getRoutingField() { + return config.getOptional(ElasticsearchOptions.ROUTING_FILED_NAME); + } + public Optional getPathPrefix() { return config.getOptional(ElasticsearchOptions.CONNECTION_PATH_PREFIX); } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java index 7f437f95e9419..85cd50147263e 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java @@ -75,6 +75,11 @@ public enum BackOffType { .defaultValue("_") .withDescription( "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\"."); + public static final ConfigOption ROUTING_FILED_NAME = + ConfigOptions.key("routing.filed-name") + .stringType() + .noDefaultValue() + .withDescription("Elasticsearch routing filed."); public static final ConfigOption FAILURE_HANDLER_OPTION = ConfigOptions.key("failure-handler") .stringType() diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RoutingExtractor.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RoutingExtractor.java new file mode 100644 index 0000000000000..76a102200a5f8 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RoutingExtractor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.List; +import java.util.function.Function; + +/** + * An extractor for a Elasticsearch routing from a {@link RowData}. + */ +@Internal +public class RoutingExtractor { + private RoutingExtractor() { + } + + public static Function createRoutingExtractor( + TableSchema schema, + @Nullable String filedName) { + if (filedName == null) { + return null; + } + List tableColumns = schema.getTableColumns(); + for (int i = 0; i < schema.getFieldCount(); i++) { + TableColumn column = tableColumns.get(i); + if (column.getName().equals(filedName)) { + RowData.FieldGetter fieldGetter = RowData.createFieldGetter( + column.getType().getLogicalType(), + i); + return (Function & Serializable) (row) -> { + Object fieldOrNull = fieldGetter.getFieldOrNull(row); + if (fieldOrNull != null) { + return fieldOrNull.toString(); + } else { + return null; + } + }; + } + } + throw new IllegalArgumentException("Filed " + filedName + " not exist in table schema."); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java index 7fb1e4d4c1529..78c591c967abc 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -50,6 +51,7 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction private final XContentType contentType; private final RequestFactory requestFactory; private final Function createKey; + private final Function createRouting; public RowElasticsearchSinkFunction( IndexGenerator indexGenerator, @@ -57,13 +59,15 @@ public RowElasticsearchSinkFunction( SerializationSchema serializationSchema, XContentType contentType, RequestFactory requestFactory, - Function createKey) { + Function createKey, + @Nullable Function createRouting) { this.indexGenerator = Preconditions.checkNotNull(indexGenerator); this.docType = docType; this.serializationSchema = Preconditions.checkNotNull(serializationSchema); this.contentType = Preconditions.checkNotNull(contentType); this.requestFactory = Preconditions.checkNotNull(requestFactory); this.createKey = Preconditions.checkNotNull(createKey); + this.createRouting = createRouting; } @Override @@ -94,11 +98,13 @@ private void processUpsert(RowData row, RequestIndexer indexer) { final UpdateRequest updateRequest = requestFactory.createUpdateRequest( indexGenerator.generate(row), docType, key, contentType, document); + addRouting(updateRequest, row); indexer.add(updateRequest); } else { final IndexRequest indexRequest = requestFactory.createIndexRequest( indexGenerator.generate(row), docType, key, contentType, document); + addRouting(indexRequest, row); indexer.add(indexRequest); } } @@ -107,9 +113,17 @@ private void processDelete(RowData row, RequestIndexer indexer) { final String key = createKey.apply(row); final DeleteRequest deleteRequest = requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key); + addRouting(deleteRequest, row); indexer.add(deleteRequest); } + private void addRouting(DocWriteRequest request, RowData row) { + if (null != createRouting) { + String routing = createRouting.apply(row); + request.routing(routing); + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml index 875a3c9d71638..85bf6796fa1b2 100644 --- a/flink-connectors/flink-connector-elasticsearch6/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml @@ -86,7 +86,7 @@ under the License. org.testcontainers elasticsearch - 1.15.0 + 1.15.1 test diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java index 508a626e50600..ed7944438beeb 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -125,7 +125,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + RoutingExtractor.createRoutingExtractor( + schema, config.getRoutingField().orElse(null))); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java index 6fc4030c171a3..b1e402f94125a 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java @@ -54,6 +54,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.ROUTING_FILED_NAME; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION; /** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}. */ @@ -64,6 +65,7 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory private static final Set> optionalOptions = Stream.of( KEY_DELIMITER_OPTION, + ROUTING_FILED_NAME, FAILURE_HANDLER_OPTION, FLUSH_ON_CHECKPOINT_OPTION, BULK_FLASH_MAX_SIZE_OPTION, diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java index cb8457b93499a..a734cd1aceec4 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.types.RowKind; import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -369,6 +370,84 @@ public void testWritingDocumentsWithDynamicIndex() throws Exception { assertThat(response, equalTo(expectedMap)); } + @Test + public void testWritingDocumentsWithRouting() throws Exception { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.TIME()) + .field("c", DataTypes.STRING().notNull()) + .field("d", DataTypes.FLOAT()) + .field("e", DataTypes.TINYINT().notNull()) + .field("f", DataTypes.DATE()) + .field("g", DataTypes.TIMESTAMP().notNull()) + .primaryKey("a", "g") + .build(); + GenericRowData rowData = + GenericRowData.of( + 1L, + 12345, + StringData.fromString("ABCDE"), + 12.12f, + (byte) 2, + 12345, + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2012-12-12T12:12:12"))); + + String index = "writing-documents"; + String myType = "MyType"; + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + SinkFunctionProvider sinkRuntimeProvider = + (SinkFunctionProvider) + sinkFactory + .createDynamicTableSink( + context() + .withSchema(schema) + .withOption( + ElasticsearchOptions.INDEX_OPTION.key(), + index) + .withOption( + ElasticsearchOptions.DOCUMENT_TYPE_OPTION + .key(), + myType) + .withOption( + ElasticsearchOptions.ROUTING_FILED_NAME + .key(), + "c") + .withOption( + ElasticsearchOptions.HOSTS_OPTION.key(), + elasticsearchContainer.getHttpHostAddress()) + .withOption( + ElasticsearchOptions + .FLUSH_ON_CHECKPOINT_OPTION + .key(), + "false") + .build()) + .getSinkRuntimeProvider(new MockContext()); + + SinkFunction sinkFunction = sinkRuntimeProvider.createSinkFunction(); + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + rowData.setRowKind(RowKind.UPDATE_AFTER); + environment.fromElements(rowData).addSink(sinkFunction); + environment.execute(); + + Client client = getClient(); + GetResponse response = + client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12").routing("ABCDE")) + .actionGet(); + Map expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12 12:12:12"); + assertThat(response.getSource(), equalTo(expectedMap)); + } + private static class MockContext implements DynamicTableSink.Context { @Override public boolean isBounded() { diff --git a/flink-connectors/flink-connector-elasticsearch7/pom.xml b/flink-connectors/flink-connector-elasticsearch7/pom.xml index bf213f62d172a..f12f68fb5bbb0 100644 --- a/flink-connectors/flink-connector-elasticsearch7/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch7/pom.xml @@ -86,7 +86,7 @@ under the License. org.testcontainers elasticsearch - 1.15.0 + 1.15.1 test diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java index 0fe2dac4d9ac2..813c56b427e07 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -126,7 +126,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + RoutingExtractor.createRoutingExtractor( + schema, config.getRoutingField().orElse(null))); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java index 9634419409174..1d02b299bef80 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java @@ -53,6 +53,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.ROUTING_FILED_NAME; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION; /** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}. */ @@ -63,6 +64,7 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory private static final Set> optionalOptions = Stream.of( KEY_DELIMITER_OPTION, + ROUTING_FILED_NAME, FAILURE_HANDLER_OPTION, FLUSH_ON_CHECKPOINT_OPTION, BULK_FLASH_MAX_SIZE_OPTION, diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java index b92f34e8c7cf2..a83a19d943250 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.types.RowKind; import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -346,6 +347,79 @@ public void testWritingDocumentsWithDynamicIndex() throws Exception { assertThat(response, equalTo(expectedMap)); } + @Test + public void testWritingDocumentsWithRouting() throws Exception { + TableSchema schema = + TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.TIME()) + .field("c", DataTypes.STRING().notNull()) + .field("d", DataTypes.FLOAT()) + .field("e", DataTypes.TINYINT().notNull()) + .field("f", DataTypes.DATE()) + .field("g", DataTypes.TIMESTAMP().notNull()) + .primaryKey("a", "g") + .build(); + GenericRowData rowData = + GenericRowData.of( + 1L, + 12345, + StringData.fromString("ABCDE"), + 12.12f, + (byte) 2, + 12345, + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2012-12-12T12:12:12"))); + + String index = "writing-documents"; + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + SinkFunctionProvider sinkRuntimeProvider = + (SinkFunctionProvider) + sinkFactory + .createDynamicTableSink( + context() + .withSchema(schema) + .withOption( + ElasticsearchOptions.INDEX_OPTION.key(), + index) + .withOption( + ElasticsearchOptions.ROUTING_FILED_NAME + .key(), + "c") + .withOption( + ElasticsearchOptions.HOSTS_OPTION.key(), + elasticsearchContainer.getHttpHostAddress()) + .withOption( + ElasticsearchOptions + .FLUSH_ON_CHECKPOINT_OPTION + .key(), + "false") + .build()) + .getSinkRuntimeProvider(new MockContext()); + + SinkFunction sinkFunction = sinkRuntimeProvider.createSinkFunction(); + StreamExecutionEnvironment environment = + StreamExecutionEnvironment.getExecutionEnvironment(); + rowData.setRowKind(RowKind.UPDATE_AFTER); + environment.fromElements(rowData).addSink(sinkFunction); + environment.execute(); + + Client client = getClient(); + GetResponse response = + client.get(new GetRequest(index, "1_2012-12-12T12:12:12").routing("ABCDE")) + .actionGet(); + Map expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12 12:12:12"); + assertThat(response.getSource(), equalTo(expectedMap)); + } + private static class MockContext implements DynamicTableSink.Context { @Override public boolean isBounded() { From 877575f48fc2b602a34213c3b9d31e850fe5b130 Mon Sep 17 00:00:00 2001 From: jrthe42 Date: Fri, 25 Dec 2020 17:10:00 +0800 Subject: [PATCH 2/2] add doc for es routing field --- docs/dev/table/connectors/elasticsearch.md | 7 +++++++ docs/dev/table/connectors/elasticsearch.zh.md | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/docs/dev/table/connectors/elasticsearch.md b/docs/dev/table/connectors/elasticsearch.md index c7cabaf249fc8..54f9babececdd 100644 --- a/docs/dev/table/connectors/elasticsearch.md +++ b/docs/dev/table/connectors/elasticsearch.md @@ -124,6 +124,13 @@ Connector Options String Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." + +
routing.filed-name
+ optional + (none) + String + Using field value in the record to dynamically generate routing filed. +
username
optional diff --git a/docs/dev/table/connectors/elasticsearch.zh.md b/docs/dev/table/connectors/elasticsearch.zh.md index 37bc50c05b815..5ebc4252deeea 100644 --- a/docs/dev/table/connectors/elasticsearch.zh.md +++ b/docs/dev/table/connectors/elasticsearch.zh.md @@ -124,6 +124,13 @@ Connector Options String Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." + +
routing.filed-name
+ optional + (none) + String + Using field value in the record to dynamically generate routing filed. +
username
optional