Skip to content

Commit efb5151

Browse files
authored
šŸ› Make all JDBC destinations (SF, RS, PG, MySQL, MSSQL, Oracle) handle wide rows by using byte-based record buffering (#7719)
1 parent 46fa44a commit efb5151

File tree

28 files changed

+126
-52
lines changed

28 files changed

+126
-52
lines changed

ā€Žairbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
8383
private final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord;
8484
private final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount;
8585
private final Consumer<AirbyteMessage> outputRecordCollector;
86-
private final int queueBatchSize;
86+
private final long maxQueueSizeInBytes;
87+
private long bufferSizeInBytes;
8788

8889
private boolean hasStarted;
8990
private boolean hasClosed;
@@ -97,9 +98,9 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
9798
final CheckedConsumer<Boolean, Exception> onClose,
9899
final ConfiguredAirbyteCatalog catalog,
99100
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
100-
final int queueBatchSize) {
101+
final long maxQueueSizeInBytes) {
101102
this.outputRecordCollector = outputRecordCollector;
102-
this.queueBatchSize = queueBatchSize;
103+
this.maxQueueSizeInBytes = maxQueueSizeInBytes;
103104
this.hasStarted = false;
104105
this.hasClosed = false;
105106
this.onStart = onStart;
@@ -108,8 +109,8 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
108109
this.catalog = catalog;
109110
this.streamNames = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
110111
this.isValidRecord = isValidRecord;
111-
this.buffer = new ArrayList<>(queueBatchSize);
112-
112+
this.buffer = new ArrayList<>(10_000);
113+
this.bufferSizeInBytes = 0;
113114
this.pairToIgnoredRecordCount = new HashMap<>();
114115
}
115116

@@ -141,11 +142,18 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
141142
return;
142143
}
143144

144-
buffer.add(message);
145-
146-
if (buffer.size() == queueBatchSize) {
145+
// TODO use a more efficient way to compute bytes that doesn't require double serialization (records are serialized again when writing to
146+
// the destination
147+
// TODO use a smarter way of estimating byte size rather than always multiply by two
148+
long messageSizeInBytes = Jsons.serialize(recordMessage.getData()).length() * 2; // Strings serialize to UTF-8 by default
149+
if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) {
147150
flushQueueToDestination();
151+
bufferSizeInBytes = 0;
148152
}
153+
154+
buffer.add(message);
155+
bufferSizeInBytes += messageSizeInBytes;
156+
149157
} else if (message.getType() == Type.STATE) {
150158
pendingState = message;
151159
} else {

ā€Žairbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ void setup() throws Exception {
8484
onClose,
8585
CATALOG,
8686
isValidRecord,
87-
10);
87+
10000);
8888

8989
when(isValidRecord.apply(any())).thenReturn(true);
9090
}
@@ -167,7 +167,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception
167167
onClose,
168168
CATALOG,
169169
isValidRecord,
170-
20);
170+
10000);
171171

172172
consumer.start();
173173
consumeRecords(consumer, expectedRecordsBatch1);

ā€Žairbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ public void testCustomDbtTransformations() throws Exception {
731731
final OperatorDbt dbtConfig = new OperatorDbt()
732732
.withGitRepoUrl("https://github.com/fishtown-analytics/jaffle_shop.git")
733733
.withGitRepoBranch("main")
734-
.withDockerImage("airbyte/normalization:dev");
734+
.withDockerImage(NormalizationRunnerFactory.getNormalizationInfoForConnector(getImageName()).getLeft());
735735
//
736736
// jaffle_shop is a fictional ecommerce store maintained by fishtownanalytics/dbt.
737737
//

ā€Žairbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public class ElasticsearchAirbyteMessageConsumerFactory {
2828

2929
private static final Logger log = LoggerFactory.getLogger(ElasticsearchAirbyteMessageConsumerFactory.class);
30-
private static final int MAX_BATCH_SIZE = 10000;
30+
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4 ; // 256mib
3131
private static final ObjectMapper mapper = new ObjectMapper();
3232

3333
private static AtomicLong recordsWritten = new AtomicLong(0);
@@ -50,7 +50,7 @@ public static AirbyteMessageConsumer create(Consumer<AirbyteMessage> outputRecor
5050
onCloseFunction(connection),
5151
catalog,
5252
isValidFunction(connection),
53-
MAX_BATCH_SIZE);
53+
MAX_BATCH_SIZE_BYTES);
5454
}
5555

5656
// is there any json node that wont fit in the index?

ā€Žairbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class JdbcBufferedConsumerFactory {
4343

4444
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);
4545

46-
private static final int MAX_BATCH_SIZE = 10000;
46+
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mib
4747

4848
public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
4949
final JdbcDatabase database,
@@ -60,7 +60,7 @@ public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outpu
6060
onCloseFunction(database, sqlOperations, writeConfigs),
6161
catalog,
6262
sqlOperations::isValidData,
63-
MAX_BATCH_SIZE);
63+
MAX_BATCH_SIZE_BYTES);
6464
}
6565

6666
private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,

ā€Žairbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.integrations.destination.jdbc;
66

77
import com.google.common.annotations.VisibleForTesting;
8+
import com.google.common.collect.Iterables;
89
import io.airbyte.commons.json.Jsons;
910
import io.airbyte.db.jdbc.JdbcDatabase;
1011
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -78,23 +79,27 @@ static void insertRawRecordsInSingleQuery(final String insertQueryComponent,
7879
// string. Thus there will be two loops below.
7980
// 1) Loop over records to build the full string.
8081
// 2) Loop over the records and bind the appropriate values to the string.
81-
final StringBuilder sql = new StringBuilder(insertQueryComponent);
82-
records.forEach(r -> sql.append(recordQueryComponent));
83-
final String s = sql.toString();
84-
final String s1 = s.substring(0, s.length() - 2) + (sem ? ";" : "");
82+
// We also partition the query to run on 10k records at a time, since some DBs set a max limit on how many records can be inserted at once
83+
// TODO(sherif) this should use a smarter, destination-aware partitioning scheme instead of 10k by default
84+
for (List<AirbyteRecordMessage> partition : Iterables.partition(records, 10_000)){
85+
final StringBuilder sql = new StringBuilder(insertQueryComponent);
86+
partition.forEach(r -> sql.append(recordQueryComponent));
87+
final String s = sql.toString();
88+
final String s1 = s.substring(0, s.length() - 2) + (sem ? ";" : "");
8589

86-
try (final PreparedStatement statement = connection.prepareStatement(s1)) {
87-
// second loop: bind values to the SQL string.
88-
int i = 1;
89-
for (final AirbyteRecordMessage message : records) {
90-
// 1-indexed
91-
statement.setString(i, uuidSupplier.get().toString());
92-
statement.setString(i + 1, Jsons.serialize(message.getData()));
93-
statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt())));
94-
i += 3;
95-
}
90+
try (final PreparedStatement statement = connection.prepareStatement(s1)) {
91+
// second loop: bind values to the SQL string.
92+
int i = 1;
93+
for (final AirbyteRecordMessage message : partition) {
94+
// 1-indexed
95+
statement.setString(i, uuidSupplier.get().toString());
96+
statement.setString(i + 1, Jsons.serialize(message.getData()));
97+
statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt())));
98+
i += 3;
99+
}
96100

97-
statement.execute();
101+
statement.execute();
102+
}
98103
}
99104
});
100105
}

ā€Žairbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class CopyConsumerFactory {
2929

3030
private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);
3131

32-
private static final int MAX_BATCH_SIZE = 10000;
32+
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4 ; // 256 mib
3333

3434
public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
3535
final JdbcDatabase database,
@@ -56,7 +56,7 @@ public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> o
5656
onCloseFunction(pairToCopier, database, sqlOperations, pairToIgnoredRecordCount),
5757
catalog,
5858
sqlOperations::isValidData,
59-
MAX_BATCH_SIZE);
59+
MAX_BATCH_SIZE_BYTES);
6060
}
6161

6262
private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWriteConfigs(final ExtendedNameTransformer namingResolver,

ā€Žairbyte-integrations/connectors/destination-meilisearch/src/main/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchDestination.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class MeiliSearchDestination extends BaseConnector implements Destination
6464

6565
private static final Logger LOGGER = LoggerFactory.getLogger(MeiliSearchDestination.class);
6666

67-
private static final int MAX_BATCH_SIZE = 10000;
67+
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4 ; //256mib
6868
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS");
6969

7070
public static final String AB_PK_COLUMN = "_ab_pk";
@@ -102,7 +102,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
102102
(hasFailed) -> LOGGER.info("Completed writing to MeiliSearch. Status: {}", hasFailed ? "FAILED" : "SUCCEEDED"),
103103
catalog,
104104
(data) -> true,
105-
MAX_BATCH_SIZE);
105+
MAX_BATCH_SIZE_BYTES);
106106
}
107107

108108
private static Map<String, Index> createIndices(final ConfiguredAirbyteCatalog catalog, final Client client) throws Exception {

ā€Žairbyte-integrations/connectors/destination-mssql-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
77

88
RUN tar xf ${APPLICATION}.tar --strip-components=1
99

10-
LABEL io.airbyte.version=0.1.0
10+
LABEL io.airbyte.version=0.1.1
1111
LABEL io.airbyte.name=airbyte/destination-mssql-strict-encrypt

ā€Žairbyte-integrations/connectors/destination-mssql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.10
11+
LABEL io.airbyte.version=0.1.11
1212
LABEL io.airbyte.name=airbyte/destination-mssql

ā€Žairbyte-integrations/connectors/destination-mysql-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.version=0.1.1
1212
LABEL io.airbyte.name=airbyte/destination-mysql-strict-encrypt

ā€Žairbyte-integrations/connectors/destination-mysql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.13
11+
LABEL io.airbyte.version=0.1.14
1212
LABEL io.airbyte.name=airbyte/destination-mysql

ā€Žairbyte-integrations/connectors/destination-oracle-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.version=0.1.1
1212
LABEL io.airbyte.name=airbyte/destination-oracle-strict-encrypt

ā€Žairbyte-integrations/connectors/destination-oracle/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.11
11+
LABEL io.airbyte.version=0.1.12
1212
LABEL io.airbyte.name=airbyte/destination-oracle

ā€Žairbyte-integrations/connectors/destination-postgres-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.version=0.1.1
1212
LABEL io.airbyte.name=airbyte/destination-postgres-strict-encrypt

ā€Žairbyte-integrations/connectors/destination-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.11
11+
LABEL io.airbyte.version=0.3.12
1212
LABEL io.airbyte.name=airbyte/destination-postgres

ā€Žairbyte-integrations/connectors/destination-redshift/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.19
11+
LABEL io.airbyte.version=0.3.20
1212
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
FROM airbyte/integration-base-java:dev
22

3+
# uncomment to run Yourkit java profiling
4+
#RUN apt-get update && apt-get install -y curl zip
5+
#
6+
#RUN curl -o /tmp/YourKit-JavaProfiler-2021.3-docker.zip https://www.yourkit.com/download/docker/YourKit-JavaProfiler-2021.3-docker.zip && \
7+
# unzip /tmp/YourKit-JavaProfiler-2021.3-docker.zip -d /usr/local && \
8+
# rm /tmp/YourKit-JavaProfiler-2021.3-docker.zip
9+
310
WORKDIR /airbyte
411

512
ENV APPLICATION destination-snowflake
@@ -8,5 +15,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
815

916
RUN tar xf ${APPLICATION}.tar --strip-components=1
1017

11-
LABEL io.airbyte.version=0.3.16
18+
LABEL io.airbyte.version=0.3.17
1219
LABEL io.airbyte.name=airbyte/destination-snowflake

ā€Žairbyte-integrations/connectors/destination-snowflake/build.gradle

+15-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,21 @@ plugins {
66

77
application {
88
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination'
9-
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
9+
// enable when profiling
10+
applicationDefaultJvmArgs = [
11+
12+
'-XX:MaxRAMPercentage=75.0',
13+
// '-XX:NativeMemoryTracking=detail',
14+
// "-Djava.rmi.server.hostname=localhost",
15+
// '-Dcom.sun.management.jmxremote=true',
16+
// '-Dcom.sun.management.jmxremote.port=6000',
17+
// "-Dcom.sun.management.jmxremote.rmi.port=6000",
18+
// '-Dcom.sun.management.jmxremote.local.only=false',
19+
// '-Dcom.sun.management.jmxremote.authenticate=false',
20+
// '-Dcom.sun.management.jmxremote.ssl=false',
21+
// '-agentpath:/usr/local/YourKit-JavaProfiler-2021.3/bin/linux-x86-64/libyjpagent.so=port=10001,listen=all'
22+
]
23+
1024
}
1125

1226
dependencies {

ā€Žairbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
9+
import com.google.common.annotations.VisibleForTesting;
910
import com.google.common.base.Preconditions;
1011
import io.airbyte.commons.io.IOs;
1112
import io.airbyte.commons.json.Jsons;
1213
import io.airbyte.commons.resources.MoreResources;
1314
import io.airbyte.commons.string.Strings;
1415
import io.airbyte.db.jdbc.JdbcUtils;
16+
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1517
import io.airbyte.integrations.base.JavaBaseConstants;
1618
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1719
import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider;
@@ -20,13 +22,19 @@
2022
import io.airbyte.protocol.models.AirbyteMessage;
2123
import io.airbyte.protocol.models.CatalogHelpers;
2224
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
25+
26+
import java.io.File;
27+
import java.io.IOException;
2328
import java.nio.file.Path;
2429
import java.sql.SQLException;
2530
import java.util.ArrayList;
2631
import java.util.Collections;
2732
import java.util.List;
33+
import java.util.Optional;
34+
import java.util.Scanner;
2835
import java.util.stream.Collectors;
2936
import org.junit.jupiter.api.Disabled;
37+
import org.junit.jupiter.api.Test;
3038
import org.junit.jupiter.params.ParameterizedTest;
3139
import org.junit.jupiter.params.provider.ArgumentsSource;
3240

@@ -162,4 +170,11 @@ public void testSyncWithBillionRecords(final String messagesFilename, final Stri
162170
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false);
163171
}
164172

173+
174+
private <T> T parseConfig(final String path, Class<T> clazz) throws IOException {
175+
return Jsons.deserialize(MoreResources.readResource(path), clazz);
176+
}
177+
private JsonNode parseConfig(final String path) throws IOException {
178+
return Jsons.deserialize(MoreResources.readResource(path));
179+
}
165180
}

ā€Žairbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import com.google.common.collect.ImmutableMap;
88
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
99
import io.airbyte.workers.process.ProcessFactory;
10+
1011
import java.util.Map;
12+
1113
import org.apache.commons.lang3.tuple.ImmutablePair;
1214

1315
public class NormalizationRunnerFactory {
@@ -30,17 +32,21 @@ public class NormalizationRunnerFactory {
3032
.put("airbyte/destination-snowflake", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.SNOWFLAKE))
3133
.build();
3234

33-
public static NormalizationRunner create(final String imageName, final ProcessFactory processFactory) {
34-
final String imageNameWithoutTag = imageName.split(":")[0];
35-
if (NORMALIZATION_MAPPING.containsKey(imageNameWithoutTag)) {
36-
final var valuePair = NORMALIZATION_MAPPING.get(imageNameWithoutTag);
35+
public static NormalizationRunner create(final String connectorImageName, final ProcessFactory processFactory) {
36+
final var valuePair = getNormalizationInfoForConnector(connectorImageName);
3737
return new DefaultNormalizationRunner(
3838
valuePair.getRight(),
3939
processFactory,
4040
String.format("%s:%s", valuePair.getLeft(), NORMALIZATION_VERSION));
41+
}
42+
43+
public static ImmutablePair<String, DestinationType> getNormalizationInfoForConnector(final String connectorImageName) {
44+
final String imageNameWithoutTag = connectorImageName.contains(":") ? connectorImageName.split(":")[0] : connectorImageName;
45+
if (NORMALIZATION_MAPPING.containsKey(imageNameWithoutTag)) {
46+
return NORMALIZATION_MAPPING.get(imageNameWithoutTag);
4147
} else {
4248
throw new IllegalStateException(
43-
String.format("Requested normalization for %s, but it is not included in the normalization mappings.", imageName));
49+
String.format("Requested normalization for %s, but it is not included in the normalization mappings.", connectorImageName));
4450
}
4551
}
4652

0 commit comments

Comments
Ā (0)