diff --git a/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy b/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy index a5fbc278a24e..2d6df33f4f1b 100644 --- a/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy @@ -80,7 +80,7 @@ job(jobName) { switches("--info") switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'") switches("-DintegrationTestRunner=dataflow") - tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOIT") + tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOITPerformance") } } } diff --git a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy new file mode 100644 index 000000000000..f43da5a0531a --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as common +import PostcommitJobBuilder +import Kubernetes + +String jobName = "beam_PostCommit_Java_SingleStoreIO_IT" + +void waitForPodWithLabel(job, Kubernetes k8s, String label) { + job.steps { + shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for-pod-with-label.sh ${label} 600") + } +} + +void waitFor(job, Kubernetes k8s, String resource) { + job.steps { + shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for.sh ${resource} 600") + } +} + + +// This job runs the integration test of java SingleStoreIO class. +PostcommitJobBuilder.postCommitJob(jobName, + 'Run Java SingleStoreIO_IT', 'Java SingleStoreIO Integration Test',this) { + description('Runs the Java SingleStoreIO Integration Test.') + + // Set common parameters. + common.setTopLevelMainJobProperties(delegate) + + // Deploy SingleStoreDB cluster + String namespace = common.getKubernetesNamespace(jobName) + String kubeconfigPath = common.getKubeconfigLocationForNamespace(namespace) + Kubernetes k8s = Kubernetes.create(delegate, kubeconfigPath, namespace) + + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-rbac.yaml")) + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-cluster-crd.yaml")) + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-operator.yaml")) + waitForPodWithLabel(delegate, k8s, "sdb-operator") + + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-cluster.yaml")) + waitFor(delegate, k8s, "memsqlclusters.memsql.com") + + String singlestoreHostName = "LOAD_BALANCER_IP" + k8s.loadBalancerIP("svc-sdb-cluster-ddl", singlestoreHostName) + + // Define test options + Map pipelineOptions = [ + tempRoot : 'gs://temp-storage-for-perf-tests', + project : 'apache-beam-testing', + runner : 'DataflowRunner', + singleStoreServerName : "\$${singlestoreHostName}", + singleStoreUsername : "admin", + singleStorePassword : "secretpass", + singleStorePort: "3306", + numberOfRecords: "1000", + ] + + // Gradle goals for this job. + steps { + gradle { + rootBuildScriptDir(common.checkoutDir) + common.setGradleSwitches(delegate) + switches("--info") + switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'") + switches("-DintegrationTestRunner=dataflow") + tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIODefaultMapperIT") + tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOSchemaTransformIT") + } + } + } diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java new file mode 100644 index 000000000000..72d5f3ffa28b --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore; + +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.CHAR; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NULL; +import static java.sql.Types.REAL; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.LocalDate; +import org.joda.time.chrono.ISOChronology; + +/** RowMapper that maps {@link ResultSet} row to the {@link Row}. */ +class SingleStoreDefaultRowMapper + implements SingleStoreIO.RowMapperWithInit, SingleStoreIO.RowMapperWithCoder { + @Nullable Schema schema = null; + List converters = new ArrayList<>(); + + @Override + public void init(ResultSetMetaData metaData) throws SQLException { + for (int i = 0; i < metaData.getColumnCount(); i++) { + converters.add(ResultSetFieldConverter.of(metaData.getColumnType(i + 1))); + } + + Schema.Builder schemaBuilder = new Schema.Builder(); + for (int i = 0; i < metaData.getColumnCount(); i++) { + schemaBuilder.addField(converters.get(i).getSchemaField(metaData, i + 1)); + } + this.schema = schemaBuilder.build(); + } + + @Override + public Row mapRow(ResultSet resultSet) throws Exception { + if (schema == null) { + throw new UnsupportedOperationException("mapRow is called before init"); + } + + Row.Builder rowBuilder = Row.withSchema(schema); + + int fieldCount = schema.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { + Object value = converters.get(i).getValue(resultSet, i + 1); + + if (resultSet.wasNull() || value == null) { + rowBuilder.addValue(null); + } else { + rowBuilder.addValue(value); + } + } + + return rowBuilder.build(); + } + + @Override + public SchemaCoder getCoder() throws Exception { + if (schema == null) { + throw new UnsupportedOperationException("getCoder is called before init"); + } + + return RowCoder.of(this.schema); + } + + abstract static class ResultSetFieldConverter implements Serializable { + abstract @Nullable Object getValue(ResultSet rs, Integer index) throws SQLException; + + Schema.Field getSchemaField(ResultSetMetaData md, Integer index) throws SQLException { + String label = md.getColumnLabel(index); + return Schema.Field.of(label, getSchemaFieldType(md, index)) + .withNullable(md.isNullable(index) == java.sql.ResultSetMetaData.columnNullable); + } + + abstract Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) + throws SQLException; + + /** + * Interface implemented by functions that extract values of different types from a JDBC + * ResultSet. + */ + @FunctionalInterface + interface ResultSetFieldExtractor extends Serializable { + @Nullable + Object extract(ResultSet rs, Integer index) throws SQLException; + } + + static ResultSetFieldConverter of(int columnType) { + switch (columnType) { + case BIT: + return new DirectResultSetFieldConverter(BOOLEAN, ResultSet::getBoolean); + case TINYINT: + return new DirectResultSetFieldConverter(BYTE, ResultSet::getByte); + case SMALLINT: + return new DirectResultSetFieldConverter(INT16, ResultSet::getShort); + case INTEGER: + return new DirectResultSetFieldConverter(INT32, ResultSet::getInt); + case BIGINT: + return new DirectResultSetFieldConverter(INT64, ResultSet::getLong); + case REAL: + return new DirectResultSetFieldConverter(FLOAT, ResultSet::getFloat); + case DOUBLE: + return new DirectResultSetFieldConverter(Schema.FieldType.DOUBLE, ResultSet::getDouble); + case DECIMAL: + return new DirectResultSetFieldConverter( + Schema.FieldType.DECIMAL, ResultSet::getBigDecimal); + case TIMESTAMP: + return new TimestampResultSetFieldConverter(); + case DATE: + return new DateResultSetFieldConverter(); + case TIME: + return new TimeResultSetFieldConverter(); + case LONGVARBINARY: + case VARBINARY: + case BINARY: + return new BinaryResultSetFieldConverter(); + case LONGVARCHAR: + case VARCHAR: + case CHAR: + return new CharResultSetFieldConverter(); + case NULL: + return new DirectResultSetFieldConverter(STRING, ResultSet::getString); + default: + throw new UnsupportedOperationException( + "Converting " + columnType + " to Beam schema type is not supported"); + } + } + } + + static class DirectResultSetFieldConverter extends ResultSetFieldConverter { + Schema.FieldType fieldType; + ResultSetFieldExtractor extractor; + + public DirectResultSetFieldConverter( + Schema.FieldType fieldType, ResultSetFieldExtractor extractor) { + this.fieldType = fieldType; + this.extractor = extractor; + } + + @Override + @Nullable + Object getValue(ResultSet rs, Integer index) throws SQLException { + return extractor.extract(rs, index); + } + + @Override + Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) { + return fieldType; + } + } + + static class CharResultSetFieldConverter extends ResultSetFieldConverter { + @Override + @Nullable + Object getValue(ResultSet rs, Integer index) throws SQLException { + return rs.getString(index); + } + + @Override + Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) throws SQLException { + int size = md.getPrecision(index); + return Schema.FieldType.logicalType(VariableString.of(size)); + } + } + + static class BinaryResultSetFieldConverter extends ResultSetFieldConverter { + @Override + @Nullable + Object getValue(ResultSet rs, Integer index) throws SQLException { + return rs.getBytes(index); + } + + @Override + Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) throws SQLException { + int size = md.getPrecision(index); + return Schema.FieldType.logicalType(VariableBytes.of(size)); + } + } + + static class TimestampResultSetFieldConverter extends ResultSetFieldConverter { + @Override + @Nullable + Object getValue(ResultSet rs, Integer index) throws SQLException { + Timestamp ts = + rs.getTimestamp(index, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC))); + if (ts == null) { + return null; + } + return new DateTime(ts.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC()); + } + + @Override + Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) { + return Schema.FieldType.DATETIME; + } + } + + static class TimeResultSetFieldConverter extends ResultSetFieldConverter { + @Override + @Nullable + Object getValue(ResultSet rs, Integer index) throws SQLException { + Time time = rs.getTime(index, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC))); + if (time == null) { + return null; + } + return new DateTime(time.getTime(), ISOChronology.getInstanceUTC()) + .withDate(new LocalDate(0L)); + } + + @Override + Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) { + return Schema.FieldType.DATETIME; + } + } + + static class DateResultSetFieldConverter extends ResultSetFieldConverter { + @Override + @Nullable + Object getValue(ResultSet rs, Integer index) throws SQLException { + // TODO(https://github.com/apache/beam/issues/19215) import when joda LocalDate is removed. + java.time.LocalDate date = rs.getObject(index, java.time.LocalDate.class); + if (date == null) { + return null; + } + ZonedDateTime zdt = date.atStartOfDay(ZoneOffset.UTC); + return new DateTime(zdt.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC()); + } + + @Override + Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) { + return Schema.FieldType.DATETIME; + } + } +} diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java new file mode 100644 index 000000000000..2035ce0554f1 --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * UserDataMapper that maps {@link Row} objects. ARRAYs, ITTERABLEs, MAPs and nested ROWs are not + * supported. + */ +final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMapper { + + private final transient DateTimeFormatter formatter = + DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS"); + + private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) { + checkArgument( + type.getTypeName().isLogicalType(), + "convertLogicalTypeFieldToString accepts only logical types"); + + Schema.LogicalType logicalType = + (Schema.LogicalType) type.getLogicalType(); + if (logicalType == null) { + throw new UnsupportedOperationException("Failed to extract logical type"); + } + + Schema.FieldType baseType = logicalType.getBaseType(); + Object baseValue = logicalType.toBaseType(value); + return convertFieldToString(baseType, baseValue); + } + + private String convertFieldToString(Schema.FieldType type, Object value) { + switch (type.getTypeName()) { + case BYTE: + return ((Byte) value).toString(); + case INT16: + return ((Short) value).toString(); + case INT32: + return ((Integer) value).toString(); + case INT64: + return ((Long) value).toString(); + case DECIMAL: + return ((BigDecimal) value).toString(); + case FLOAT: + return ((Float) value).toString(); + case DOUBLE: + return ((Double) value).toString(); + case STRING: + return (String) value; + case DATETIME: + return formatter.print((Instant) value); + case BOOLEAN: + return ((Boolean) value) ? "1" : "0"; + case BYTES: + return new String((byte[]) value, StandardCharsets.UTF_8); + case ARRAY: + throw new UnsupportedOperationException( + "Writing of ARRAY type is not supported by the default UserDataMapper"); + case ITERABLE: + throw new UnsupportedOperationException( + "Writing of ITERABLE type is not supported by the default UserDataMapper"); + case MAP: + throw new UnsupportedOperationException( + "Writing of MAP type is not supported by the default UserDataMapper"); + case ROW: + throw new UnsupportedOperationException( + "Writing of nested ROW type is not supported by the default UserDataMapper"); + case LOGICAL_TYPE: + return convertLogicalTypeFieldToString(type, value); + default: + throw new UnsupportedOperationException( + String.format( + "Writing of %s type is not supported by the default UserDataMapper", + type.getTypeName().name())); + } + } + + @Override + public List mapRow(Row element) { + List res = new ArrayList<>(); + + Schema s = element.getSchema(); + for (int i = 0; i < s.getFieldCount(); i++) { + res.add(convertFieldToString(s.getField(i).getType(), element.getValue(i))); + } + + return res; + } +} diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java index 6873ae6b8b37..de2a68052789 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java @@ -30,6 +30,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.Statement; import java.util.ArrayList; import java.util.List; @@ -38,6 +39,8 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; @@ -53,6 +56,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.Row; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.DelegatingStatement; import org.checkerframework.checker.nullness.qual.Nullable; @@ -168,6 +172,14 @@ public static Read read() { return new AutoValue_SingleStoreIO_Read.Builder().setOutputParallelization(true).build(); } + /** Read Beam {@link Row}s from a SingleStoreDB datasource. */ + public static Read readRows() { + return new AutoValue_SingleStoreIO_Read.Builder() + .setRowMapper(new SingleStoreDefaultRowMapper()) + .setOutputParallelization(true) + .build(); + } + /** * Like {@link #read}, but executes multiple instances of the query on the same table for each * database partition. @@ -178,6 +190,16 @@ public static ReadWithPartitions readWithPartitions() { return new AutoValue_SingleStoreIO_ReadWithPartitions.Builder().build(); } + /** + * Like {@link #readRows}, but executes multiple instances of the query on the same table for each + * database partition. + */ + public static ReadWithPartitions readWithPartitionsRows() { + return new AutoValue_SingleStoreIO_ReadWithPartitions.Builder() + .setRowMapper(new SingleStoreDefaultRowMapper()) + .build(); + } + /** * Write data to a SingleStoreDB datasource. * @@ -187,6 +209,13 @@ public static Write write() { return new AutoValue_SingleStoreIO_Write.Builder().build(); } + /** Write Beam {@link Row}s to a SingleStoreDB datasource. */ + public static Write writeRows() { + return new AutoValue_SingleStoreIO_Write.Builder() + .setUserDataMapper(new SingleStoreDefaultUserDataMapper()) + .build(); + } + /** * An interface used by {@link Read} and {@link ReadWithPartitions} for converting each row of the * {@link ResultSet} into an element of the resulting {@link PCollection}. @@ -196,6 +225,19 @@ public interface RowMapper extends Serializable { T mapRow(ResultSet resultSet) throws Exception; } + /** + * A RowMapper that requires initialization. init method is called during pipeline construction + * time. + */ + public interface RowMapperWithInit extends RowMapper { + void init(ResultSetMetaData resultSetMetaData) throws Exception; + } + + /** A RowMapper that provides a Coder for resulting PCollection. */ + public interface RowMapperWithCoder extends RowMapper { + Coder getCoder() throws Exception; + } + /** * An interface used by the SingleStoreIO {@link Read} to set the parameters of the {@link * PreparedStatement}. @@ -219,6 +261,7 @@ public interface UserDataMapper extends Serializable { * A POJO describing a SingleStoreDB {@link DataSource} by providing all properties needed to * create it. */ + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class DataSourceConfiguration implements Serializable { abstract @Nullable String getEndpoint(); @@ -406,6 +449,15 @@ public PCollection expand(PBegin input) { Preconditions.checkArgumentNotNull(rowMapper, "withRowMapper() is required"); String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery()); + if (rowMapper instanceof RowMapperWithInit) { + try { + ((RowMapperWithInit) rowMapper) + .init(getResultSetMetadata(dataSourceConfiguration, actualQuery)); + } catch (Exception e) { + throw new SingleStoreRowMapperInitializationException(e); + } + } + Coder coder = SingleStoreUtil.inferCoder( rowMapper, @@ -432,6 +484,12 @@ public PCollection expand(PBegin input) { return output; } + public static class SingleStoreRowMapperInitializationException extends RuntimeException { + SingleStoreRowMapperInitializationException(Throwable cause) { + super("Failed to initialize RowMapper", cause); + } + } + private static class ReadFn extends DoFn { DataSourceConfiguration dataSourceConfiguration; String query; @@ -595,6 +653,15 @@ public PCollection expand(PBegin input) { String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery()); + if (rowMapper instanceof RowMapperWithInit) { + try { + ((RowMapperWithInit) rowMapper) + .init(getResultSetMetadata(dataSourceConfiguration, actualQuery)); + } catch (Exception e) { + throw new Read.SingleStoreRowMapperInitializationException(e); + } + } + Coder coder = SingleStoreUtil.inferCoder( rowMapper, @@ -715,6 +782,28 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + private static ResultSetMetaData getResultSetMetadata( + DataSourceConfiguration dataSourceConfiguration, String query) throws Exception { + DataSource dataSource = dataSourceConfiguration.getDataSource(); + Connection conn = dataSource.getConnection(); + try { + PreparedStatement stmt = + conn.prepareStatement(String.format("SELECT * FROM (%s) LIMIT 0", query)); + try { + ResultSetMetaData md = stmt.getMetaData(); + if (md == null) { + throw new Exception("ResultSetMetaData is null"); + } + + return md; + } finally { + stmt.close(); + } + } finally { + conn.close(); + } + } + /** * A {@link PTransform} for writing data to SingleStoreDB. It is used by {@link * SingleStoreIO#write()}. diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java index c76b0a678c15..f64885e723fd 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java @@ -42,6 +42,14 @@ public static Coder inferCoder( CoderRegistry registry, SchemaRegistry schemaRegistry, Logger log) { + if (rowMapper instanceof SingleStoreIO.RowMapperWithCoder) { + try { + return ((SingleStoreIO.RowMapperWithCoder) rowMapper).getCoder(); + } catch (Exception e) { + log.warn("Unable to infer a coder from RowMapper. Attempting to infer a coder from type."); + } + } + TypeDescriptor outputType = TypeDescriptors.extractFromTypeParameters( rowMapper, diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java new file mode 100644 index 000000000000..6951e9b31837 --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore.schematransform; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.singlestore.SingleStoreIO; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Configuration for reading from SignleStoreDB. + * + *

This class is meant to be used with {@link SingleStoreSchemaTransformReadProvider}. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class SingleStoreSchemaTransformReadConfiguration { + + /** Instantiates a {@link SingleStoreSchemaTransformReadConfiguration.Builder}. */ + public static Builder builder() { + return new AutoValue_SingleStoreSchemaTransformReadConfiguration.Builder(); + } + + private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); + private static final TypeDescriptor TYPE_DESCRIPTOR = + TypeDescriptor.of(SingleStoreSchemaTransformReadConfiguration.class); + private static final SerializableFunction + ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); + + /** Serializes configuration to a {@link Row}. */ + public Row toBeamRow() { + return ROW_SERIALIZABLE_FUNCTION.apply(this); + } + + @Nullable + public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration(); + + @Nullable + public abstract String getQuery(); + + @Nullable + public abstract String getTable(); + + @Nullable + public abstract Boolean getOutputParallelization(); + + @Nullable + public abstract Boolean getWithPartitions(); + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value); + + public abstract Builder setTable(String value); + + public abstract Builder setQuery(String value); + + public abstract Builder setOutputParallelization(Boolean value); + + public abstract Builder setWithPartitions(Boolean value); + + /** Builds the {@link SingleStoreSchemaTransformReadConfiguration} configuration. */ + public abstract SingleStoreSchemaTransformReadConfiguration build(); + } +} diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java new file mode 100644 index 000000000000..601fd1a89d11 --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore.schematransform; + +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.singlestore.SingleStoreIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured + * using {@link SingleStoreSchemaTransformReadConfiguration}. + */ +public class SingleStoreSchemaTransformReadProvider + extends TypedSchemaTransformProvider { + private static final String OUTPUT_TAG = "OUTPUT"; + + /** Returns the expected class of the configuration. */ + @Override + protected Class configurationClass() { + return SingleStoreSchemaTransformReadConfiguration.class; + } + + /** Returns the expected {@link SchemaTransform} of the configuration. */ + @Override + protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) { + return new SingleStoreReadSchemaTransform(configuration); + } + + /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:singlestore_read:v1"; + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since + * no input is expected, this returns an empty list. + */ + @Override + public List inputCollectionNames() { + return Collections.emptyList(); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since + * a single output is expected, this returns a list with a single name. + */ + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + /** + * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using + * {@link SingleStoreSchemaTransformReadConfiguration}. + */ + private static class SingleStoreReadSchemaTransform implements SchemaTransform { + private final SingleStoreSchemaTransformReadConfiguration configuration; + + SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) { + this.configuration = configuration; + } + + /** Implements {@link SchemaTransform} buildTransform method. */ + @Override + public PTransform buildTransform() { + return new PCollectionRowTupleTransform(configuration); + } + } + + /** + * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link + * SingleStoreSchemaTransformReadConfiguration}. + */ + static class PCollectionRowTupleTransform + extends PTransform { + + private final SingleStoreSchemaTransformReadConfiguration configuration; + + PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + if (!input.getAll().isEmpty()) { + throw new IllegalArgumentException( + String.format( + "%s %s input is expected to be empty", + input.getClass().getSimpleName(), getClass().getSimpleName())); + } + SingleStoreIO.DataSourceConfiguration dataSourceConfiguration = + configuration.getDataSourceConfiguration(); + String table = configuration.getTable(); + String query = configuration.getQuery(); + Boolean outputParallelization = configuration.getOutputParallelization(); + Boolean withPartitions = configuration.getWithPartitions(); + + Preconditions.checkArgument( + !(outputParallelization != null && withPartitions != null && withPartitions), + "outputParallelization parameter is not supported for partitioned read"); + + if (withPartitions != null && withPartitions) { + SingleStoreIO.ReadWithPartitions readWithPartitions = + SingleStoreIO.readWithPartitionsRows(); + + if (dataSourceConfiguration != null) { + readWithPartitions = + readWithPartitions.withDataSourceConfiguration(dataSourceConfiguration); + } + + if (table != null && !table.isEmpty()) { + readWithPartitions = readWithPartitions.withTable(table); + } + + if (query != null && !query.isEmpty()) { + readWithPartitions = readWithPartitions.withQuery(query); + } + + PCollection rows = input.getPipeline().apply(readWithPartitions); + Schema schema = rows.getSchema(); + + return PCollectionRowTuple.of(OUTPUT_TAG, rows.setRowSchema(schema)); + } else { + SingleStoreIO.Read read = SingleStoreIO.readRows(); + + if (dataSourceConfiguration != null) { + read = read.withDataSourceConfiguration(dataSourceConfiguration); + } + + if (table != null && !table.isEmpty()) { + read = read.withTable(table); + } + + if (query != null && !query.isEmpty()) { + read = read.withQuery(query); + } + + if (outputParallelization != null) { + read = read.withOutputParallelization(outputParallelization); + } + + PCollection rows = input.getPipeline().apply(read); + Schema schema = rows.getSchema(); + + return PCollectionRowTuple.of(OUTPUT_TAG, rows.setRowSchema(schema)); + } + } + } +} diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java new file mode 100644 index 000000000000..2903035e402e --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore.schematransform; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.singlestore.SingleStoreIO; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Configuration for writing to SignleStoreDB. + * + *

This class is meant to be used with {@link SingleStoreSchemaTransformWriteProvider}. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class SingleStoreSchemaTransformWriteConfiguration { + + /** Instantiates a {@link SingleStoreSchemaTransformWriteConfiguration.Builder}. */ + public static Builder builder() { + return new AutoValue_SingleStoreSchemaTransformWriteConfiguration.Builder(); + } + + private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); + private static final TypeDescriptor + TYPE_DESCRIPTOR = TypeDescriptor.of(SingleStoreSchemaTransformWriteConfiguration.class); + private static final SerializableFunction + ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); + + /** Serializes configuration to a {@link Row}. */ + public Row toBeamRow() { + return ROW_SERIALIZABLE_FUNCTION.apply(this); + } + + @Nullable + public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration(); + + @Nullable + public abstract String getTable(); + + @Nullable + public abstract Integer getBatchSize(); + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value); + + public abstract Builder setTable(String value); + + public abstract Builder setBatchSize(Integer value); + + /** Builds the {@link SingleStoreSchemaTransformWriteConfiguration} configuration. */ + public abstract SingleStoreSchemaTransformWriteConfiguration build(); + } +} diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java new file mode 100644 index 000000000000..5b68e1e05c5e --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore.schematransform; + +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.singlestore.SingleStoreIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB write jobs configured + * using {@link SingleStoreSchemaTransformWriteConfiguration}. + */ +public class SingleStoreSchemaTransformWriteProvider + extends TypedSchemaTransformProvider { + + private static final String OUTPUT_TAG = "OUTPUT"; + public static final String INPUT_TAG = "INPUT"; + + /** Returns the expected class of the configuration. */ + @Override + protected Class configurationClass() { + return SingleStoreSchemaTransformWriteConfiguration.class; + } + + /** Returns the expected {@link SchemaTransform} of the configuration. */ + @Override + protected SchemaTransform from(SingleStoreSchemaTransformWriteConfiguration configuration) { + return new SingleStoreWriteSchemaTransform(configuration); + } + + /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:singlestore_write:v1"; + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since + * no input is expected, this returns an empty list. + */ + @Override + public List inputCollectionNames() { + return Collections.emptyList(); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since + * a single output is expected, this returns a list with a single name. + */ + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + /** + * An implementation of {@link SchemaTransform} for SingleStoreDB write jobs configured using + * {@link SingleStoreSchemaTransformWriteConfiguration}. + */ + private static class SingleStoreWriteSchemaTransform implements SchemaTransform { + private final SingleStoreSchemaTransformWriteConfiguration configuration; + + SingleStoreWriteSchemaTransform(SingleStoreSchemaTransformWriteConfiguration configuration) { + this.configuration = configuration; + } + + /** Implements {@link SchemaTransform} buildTransform method. */ + @Override + public PTransform buildTransform() { + return new PCollectionRowTupleTransform(configuration); + } + } + + /** + * An implementation of {@link PTransform} for SingleStoreDB write jobs configured using {@link + * SingleStoreSchemaTransformWriteConfiguration}. + */ + static class PCollectionRowTupleTransform + extends PTransform { + + private final SingleStoreSchemaTransformWriteConfiguration configuration; + + PCollectionRowTupleTransform(SingleStoreSchemaTransformWriteConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + if (!input.has(INPUT_TAG)) { + throw new IllegalArgumentException( + String.format( + "%s %s is missing expected tag: %s", + getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG)); + } + SingleStoreIO.DataSourceConfiguration dataSourceConfiguration = + configuration.getDataSourceConfiguration(); + String table = configuration.getTable(); + Integer batchSize = configuration.getBatchSize(); + + SingleStoreIO.Write write = SingleStoreIO.writeRows(); + + if (dataSourceConfiguration != null) { + write = write.withDataSourceConfiguration(dataSourceConfiguration); + } + + if (table != null && !table.isEmpty()) { + write = write.withTable(table); + } + + if (batchSize != null) { + write = write.withBatchSize(batchSize); + } + + PCollection res = input.get(INPUT_TAG).apply(write); + Schema.Builder schemaBuilder = new Schema.Builder(); + schemaBuilder.addField("rowsWritten", Schema.FieldType.INT32); + Schema schema = schemaBuilder.build(); + return PCollectionRowTuple.of( + OUTPUT_TAG, + res.apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via( + (SerializableFunction) + a -> { + Row.Builder rowBuilder = Row.withSchema(schema); + rowBuilder.addValue(a); + return rowBuilder.build(); + })) + .setRowSchema(schema)); + } + } +} diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java new file mode 100644 index 000000000000..d71e5eb209e9 --- /dev/null +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** SingleStoreIO SchemaTransforms. */ +package org.apache.beam.sdk.io.singlestore.schematransform; diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java new file mode 100644 index 000000000000..54de09a4e953 --- /dev/null +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore; + +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.CHAR; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NULL; +import static java.sql.Types.REAL; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.singlestore.jdbc.client.result.ResultSetMetaData; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Test DefaultRowMapper. */ +@RunWith(JUnit4.class) +public class SingleStoreDefaultRowMapperTest { + @Test + public void testEmptyRow() throws Exception { + ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class); + Mockito.when(md.getColumnCount()).thenReturn(0); + ResultSet res = Mockito.mock(ResultSet.class); + + SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper(); + mapper.init(md); + Schema s = mapper.getCoder().getSchema(); + Row r = mapper.mapRow(res); + + assertEquals(0, s.getFieldCount()); + assertEquals(0, r.getFieldCount()); + } + + @Test + public void testAllDataTypes() throws Exception { + ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class); + + Mockito.when(md.getColumnCount()).thenReturn(17); + + Mockito.when(md.getColumnType(1)).thenReturn(BIT); + Mockito.when(md.getColumnType(2)).thenReturn(TINYINT); + Mockito.when(md.getColumnType(3)).thenReturn(SMALLINT); + Mockito.when(md.getColumnType(4)).thenReturn(INTEGER); + Mockito.when(md.getColumnType(5)).thenReturn(BIGINT); + Mockito.when(md.getColumnType(6)).thenReturn(REAL); + Mockito.when(md.getColumnType(7)).thenReturn(DOUBLE); + Mockito.when(md.getColumnType(8)).thenReturn(DECIMAL); + Mockito.when(md.getColumnType(9)).thenReturn(TIMESTAMP); + Mockito.when(md.getColumnType(10)).thenReturn(DATE); + Mockito.when(md.getColumnType(11)).thenReturn(TIME); + Mockito.when(md.getColumnType(12)).thenReturn(LONGVARBINARY); + Mockito.when(md.getColumnType(13)).thenReturn(VARBINARY); + Mockito.when(md.getColumnType(14)).thenReturn(BINARY); + Mockito.when(md.getColumnType(15)).thenReturn(LONGVARCHAR); + Mockito.when(md.getColumnType(16)).thenReturn(VARCHAR); + Mockito.when(md.getColumnType(17)).thenReturn(CHAR); + + for (int i = 12; i <= 17; i++) { + Mockito.when(md.getPrecision(i)).thenReturn(10); + } + + for (int i = 1; i <= md.getColumnCount(); i++) { + Mockito.when(md.getColumnLabel(i)).thenReturn("c" + i); + Mockito.when(md.isNullable(i)).thenReturn(java.sql.ResultSetMetaData.columnNoNulls); + } + + ResultSet res = Mockito.mock(ResultSet.class); + + Mockito.when(res.getBoolean(1)).thenReturn(true); + Mockito.when(res.getByte(2)).thenReturn((byte) 10); + Mockito.when(res.getShort(3)).thenReturn((short) 10); + Mockito.when(res.getInt(4)).thenReturn(10); + Mockito.when(res.getLong(5)).thenReturn((long) 10); + Mockito.when(res.getFloat(6)).thenReturn((float) 10.1); + Mockito.when(res.getDouble(7)).thenReturn(10.1); + Mockito.when(res.getBigDecimal(8)).thenReturn(new BigDecimal("100.100")); + Mockito.when(res.getTimestamp(Mockito.eq(9), Mockito.any())) + .thenReturn(Timestamp.valueOf("2022-10-10 10:10:10")); + Mockito.when(res.getObject(10, java.time.LocalDate.class)) + .thenReturn(LocalDate.of(2022, 10, 10)); + Mockito.when(res.getTime(Mockito.eq(11), Mockito.any())).thenReturn(Time.valueOf("10:10:10")); + Mockito.when(res.getBytes(12)).thenReturn("asd".getBytes(StandardCharsets.UTF_8)); + Mockito.when(res.getBytes(13)).thenReturn("asd".getBytes(StandardCharsets.UTF_8)); + Mockito.when(res.getBytes(14)).thenReturn("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8)); + Mockito.when(res.getString(15)).thenReturn("asd"); + Mockito.when(res.getString(16)).thenReturn("asd"); + Mockito.when(res.getString(17)).thenReturn("asd"); + + Mockito.when(res.wasNull()).thenReturn(false); + + SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper(); + mapper.init(md); + Schema s = mapper.getCoder().getSchema(); + Row r = mapper.mapRow(res); + + assertEquals(17, s.getFieldCount()); + for (int i = 0; i < s.getFieldCount(); i++) { + assertEquals("c" + (i + 1), s.getField(i).getName()); + } + + assertEquals(Schema.FieldType.BOOLEAN, s.getField(0).getType()); + assertEquals(Schema.FieldType.BYTE, s.getField(1).getType()); + assertEquals(Schema.FieldType.INT16, s.getField(2).getType()); + assertEquals(Schema.FieldType.INT32, s.getField(3).getType()); + assertEquals(Schema.FieldType.INT64, s.getField(4).getType()); + assertEquals(Schema.FieldType.FLOAT, s.getField(5).getType()); + assertEquals(Schema.FieldType.DOUBLE, s.getField(6).getType()); + assertEquals(Schema.FieldType.DECIMAL, s.getField(7).getType()); + assertEquals(Schema.FieldType.DATETIME, s.getField(8).getType()); + assertEquals(Schema.FieldType.DATETIME, s.getField(9).getType()); + assertEquals(Schema.FieldType.DATETIME, s.getField(10).getType()); + assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(11).getType().getTypeName()); + assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(12).getType().getTypeName()); + assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(13).getType().getTypeName()); + assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(14).getType().getTypeName()); + assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(15).getType().getTypeName()); + assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(16).getType().getTypeName()); + assertEquals(Schema.FieldType.BYTES, s.getField(11).getType().getLogicalType().getBaseType()); + assertEquals(Schema.FieldType.BYTES, s.getField(12).getType().getLogicalType().getBaseType()); + assertEquals(Schema.FieldType.BYTES, s.getField(13).getType().getLogicalType().getBaseType()); + assertEquals(Schema.FieldType.STRING, s.getField(14).getType().getLogicalType().getBaseType()); + assertEquals(Schema.FieldType.STRING, s.getField(15).getType().getLogicalType().getBaseType()); + assertEquals(Schema.FieldType.STRING, s.getField(16).getType().getLogicalType().getBaseType()); + + assertEquals(17, r.getFieldCount()); + + assertEquals(true, r.getBoolean(0)); + assertEquals((Byte) (byte) 10, r.getByte(1)); + assertEquals((Short) (short) 10, r.getInt16(2)); + assertEquals((Integer) 10, r.getInt32(3)); + assertEquals((Long) (long) 10, r.getInt64(4)); + assertEquals((Float) (float) 10.1, r.getFloat(5)); + assertEquals((Double) 10.1, r.getDouble(6)); + assertEquals(new BigDecimal("100.100"), r.getDecimal(7)); + assertEquals(0, new DateTime("2022-10-10T10:10:10").compareTo(r.getDateTime(8))); + assertEquals(0, new DateTime("2022-10-10T00:00:00Z").compareTo(r.getDateTime(9))); + assertEquals(0, new DateTime("1970-01-01T10:10:10").compareTo(r.getDateTime(10))); + assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(11)); + assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(12)); + assertArrayEquals("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8), r.getBytes(13)); + assertEquals("asd", r.getString(14)); + assertEquals("asd", r.getString(15)); + assertEquals("asd", r.getString(16)); + } + + @Test + public void testNullValues() throws Exception { + ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class); + + Mockito.when(md.getColumnCount()).thenReturn(18); + + Mockito.when(md.getColumnType(1)).thenReturn(BIT); + Mockito.when(md.getColumnType(2)).thenReturn(TINYINT); + Mockito.when(md.getColumnType(3)).thenReturn(SMALLINT); + Mockito.when(md.getColumnType(4)).thenReturn(INTEGER); + Mockito.when(md.getColumnType(5)).thenReturn(BIGINT); + Mockito.when(md.getColumnType(6)).thenReturn(REAL); + Mockito.when(md.getColumnType(7)).thenReturn(DOUBLE); + Mockito.when(md.getColumnType(8)).thenReturn(DECIMAL); + Mockito.when(md.getColumnType(9)).thenReturn(TIMESTAMP); + Mockito.when(md.getColumnType(10)).thenReturn(DATE); + Mockito.when(md.getColumnType(11)).thenReturn(TIME); + Mockito.when(md.getColumnType(12)).thenReturn(LONGVARBINARY); + Mockito.when(md.getColumnType(13)).thenReturn(VARBINARY); + Mockito.when(md.getColumnType(14)).thenReturn(BINARY); + Mockito.when(md.getColumnType(15)).thenReturn(LONGVARCHAR); + Mockito.when(md.getColumnType(16)).thenReturn(VARCHAR); + Mockito.when(md.getColumnType(17)).thenReturn(CHAR); + Mockito.when(md.getColumnType(18)).thenReturn(NULL); + + for (int i = 1; i <= md.getColumnCount(); i++) { + Mockito.when(md.getColumnLabel(i)).thenReturn("c" + i); + Mockito.when(md.isNullable(i)).thenReturn(java.sql.ResultSetMetaData.columnNullable); + } + + ResultSet res = Mockito.mock(ResultSet.class); + + Mockito.when(res.getBoolean(1)).thenReturn(false); + Mockito.when(res.getByte(2)).thenReturn((byte) 0); + Mockito.when(res.getShort(3)).thenReturn((short) 0); + Mockito.when(res.getInt(4)).thenReturn(0); + Mockito.when(res.getLong(5)).thenReturn((long) 0); + Mockito.when(res.getFloat(6)).thenReturn((float) 0); + Mockito.when(res.getDouble(7)).thenReturn(0.0); + Mockito.when(res.getBigDecimal(8)).thenReturn(null); + Mockito.when(res.getTimestamp(9)).thenReturn(null); + Mockito.when(res.getDate(10)).thenReturn(null); + Mockito.when(res.getTime(11)).thenReturn(null); + Mockito.when(res.getBytes(12)).thenReturn(null); + Mockito.when(res.getBytes(13)).thenReturn(null); + Mockito.when(res.getBytes(14)).thenReturn(null); + Mockito.when(res.getString(15)).thenReturn(null); + Mockito.when(res.getString(16)).thenReturn(null); + Mockito.when(res.getString(17)).thenReturn(null); + Mockito.when(res.getString(18)).thenReturn(null); + + Mockito.when(res.wasNull()).thenReturn(true); + + SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper(); + mapper.init(md); + Schema s = mapper.getCoder().getSchema(); + Row r = mapper.mapRow(res); + + assertEquals(18, s.getFieldCount()); + for (int i = 0; i < s.getFieldCount(); i++) { + assertEquals("c" + (i + 1), s.getField(i).getName()); + } + + assertEquals(18, r.getFieldCount()); + for (int i = 0; i < r.getFieldCount(); i++) { + assertNull(r.getValue(i)); + } + } +} diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java new file mode 100644 index 000000000000..6472ad13cff6 --- /dev/null +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test DefaultUserDataMapper. */ +@RunWith(JUnit4.class) +public class SingleStoreDefaultUserDataMapperTest { + @Test + public void testNullValues() {} + + @Test + public void testBigNumbers() {} + + @Test + public void testBigNegativeNumbers() { + Schema.Builder schemaBuilder = new Schema.Builder(); + schemaBuilder.addField("byte", Schema.FieldType.BYTE); + schemaBuilder.addField("int16", Schema.FieldType.INT16); + schemaBuilder.addField("int32", Schema.FieldType.INT32); + schemaBuilder.addField("int64", Schema.FieldType.INT64); + schemaBuilder.addField("float", Schema.FieldType.FLOAT); + schemaBuilder.addField("double", Schema.FieldType.DOUBLE); + schemaBuilder.addField("decimal", Schema.FieldType.DECIMAL); + Schema schema = schemaBuilder.build(); + + Row.Builder rowBuilder = Row.withSchema(schema); + rowBuilder.addValue(Byte.MIN_VALUE); + rowBuilder.addValue(Short.MIN_VALUE); + rowBuilder.addValue(Integer.MIN_VALUE); + rowBuilder.addValue(Long.MIN_VALUE); + rowBuilder.addValue(-Float.MAX_VALUE); + rowBuilder.addValue(-Double.MAX_VALUE); + rowBuilder.addValue(new BigDecimal("-10000000000000.1000000000000000000000")); + Row row = rowBuilder.build(); + + SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper(); + List res = mapper.mapRow(row); + + assertEquals(7, res.size()); + assertEquals("-128", res.get(0)); + assertEquals("-32768", res.get(1)); + assertEquals("-2147483648", res.get(2)); + assertEquals("-9223372036854775808", res.get(3)); + assertEquals("-3.4028235E38", res.get(4)); + assertEquals("-1.7976931348623157E308", res.get(5)); + assertEquals("-10000000000000.1000000000000000000000", res.get(6)); + } + + @Test + public void testEmptyRow() { + Schema.Builder schemaBuilder = new Schema.Builder(); + Schema schema = schemaBuilder.build(); + + Row.Builder rowBuilder = Row.withSchema(schema); + Row row = rowBuilder.build(); + + SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper(); + List res = mapper.mapRow(row); + + assertEquals(0, res.size()); + } + + @Test + public void testAllDataTypes() { + Schema.Builder schemaBuilder = new Schema.Builder(); + schemaBuilder.addField("byte", Schema.FieldType.BYTE); + schemaBuilder.addField("int16", Schema.FieldType.INT16); + schemaBuilder.addField("int32", Schema.FieldType.INT32); + schemaBuilder.addField("int64", Schema.FieldType.INT64); + schemaBuilder.addField("float", Schema.FieldType.FLOAT); + schemaBuilder.addField("double", Schema.FieldType.DOUBLE); + schemaBuilder.addField("decimal", Schema.FieldType.DECIMAL); + schemaBuilder.addField("boolean", Schema.FieldType.BOOLEAN); + schemaBuilder.addField("datetime", Schema.FieldType.DATETIME); + schemaBuilder.addField("bytes", Schema.FieldType.BYTES); + schemaBuilder.addField("string", Schema.FieldType.STRING); + Schema schema = schemaBuilder.build(); + + Row.Builder rowBuilder = Row.withSchema(schema); + rowBuilder.addValue((byte) 10); + rowBuilder.addValue((short) 10); + rowBuilder.addValue(10); + rowBuilder.addValue((long) 10); + rowBuilder.addValue((float) 10.1); + rowBuilder.addValue(10.1); + rowBuilder.addValue(new BigDecimal("10.1")); + rowBuilder.addValue(false); + rowBuilder.addValue(new DateTime("2022-01-01T10:10:10.012Z")); + rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8)); + rowBuilder.addValue("asd"); + Row row = rowBuilder.build(); + + SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper(); + List res = mapper.mapRow(row); + + assertEquals(11, res.size()); + assertEquals("10", res.get(0)); + assertEquals("10", res.get(1)); + assertEquals("10", res.get(2)); + assertEquals("10", res.get(3)); + assertEquals("10.1", res.get(4)); + assertEquals("10.1", res.get(5)); + assertEquals("10.1", res.get(6)); + assertEquals("0", res.get(7)); + assertEquals("2022-01-01 10:10:10.012", res.get(8)); + assertEquals("asd", res.get(9)); + assertEquals("asd", res.get(10)); + } +} diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java new file mode 100644 index 000000000000..32fc9d1a11f4 --- /dev/null +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore; + +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import javax.sql.DataSource; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SingleStoreIODefaultMapperIT { + private static final String DATABASE_NAME = "SingleStoreIOIT"; + + private static int numberOfRows; + + private static String tableName; + + private static String serverName; + + private static String username; + + private static String password; + + private static Integer port; + + private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration; + + private static Schema schema; + + @BeforeClass + public static void setup() { + SingleStoreIOTestPipelineOptions options; + try { + options = readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class); + } catch (IllegalArgumentException e) { + options = null; + } + org.junit.Assume.assumeNotNull(options); + + numberOfRows = options.getNumberOfRecords(); + serverName = options.getSingleStoreServerName(); + username = options.getSingleStoreUsername(); + password = options.getSingleStorePassword(); + port = options.getSingleStorePort(); + tableName = DatabaseTestHelper.getTestTableName("IT"); + dataSourceConfiguration = + SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port) + .withDatabase(DATABASE_NAME) + .withPassword(password) + .withUsername(username); + + generateSchema(); + } + + private static void generateSchema() { + Schema.Builder schemaBuilder = new Schema.Builder(); + schemaBuilder.addField("c1", Schema.FieldType.BOOLEAN); + schemaBuilder.addField("c2", Schema.FieldType.BYTE); + schemaBuilder.addField("c3", Schema.FieldType.INT16); + schemaBuilder.addField("c4", Schema.FieldType.INT32); + schemaBuilder.addField("c5", Schema.FieldType.INT64); + schemaBuilder.addField("c6", Schema.FieldType.FLOAT); + schemaBuilder.addField("c7", Schema.FieldType.DOUBLE); + schemaBuilder.addField("c8", Schema.FieldType.DECIMAL); + schemaBuilder.addField("c9", Schema.FieldType.DATETIME); + schemaBuilder.addField("c10", Schema.FieldType.DATETIME); + schemaBuilder.addField("c11", Schema.FieldType.DATETIME); + schemaBuilder.addField("c12", Schema.FieldType.BYTES); + schemaBuilder.addField("c13", Schema.FieldType.BYTES); + schemaBuilder.addField("c14", Schema.FieldType.BYTES); + schemaBuilder.addField("c15", Schema.FieldType.STRING); + schemaBuilder.addField("c16", Schema.FieldType.STRING); + schemaBuilder.addField("c17", Schema.FieldType.STRING); + + schema = schemaBuilder.build(); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteThenRead() throws Exception { + TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME); + createTable(); + + try { + PipelineResult writeResult = runWrite(); + assertEquals(PipelineResult.State.DONE, writeResult.waitUntilFinish()); + PipelineResult readResult = runRead(); + assertEquals(PipelineResult.State.DONE, readResult.waitUntilFinish()); + } finally { + DataSource dataSource = dataSourceConfiguration.getDataSource(); + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + } + + private void createTable() throws SQLException { + DataSource dataSource = dataSourceConfiguration.getDataSource(); + Connection conn = dataSource.getConnection(); + try { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("DROP TABLE IF EXISTS " + tableName); + stmt.executeUpdate( + "CREATE TABLE " + + tableName + + "(" + + "c1 BIT, " + + "c2 TINYINT, " + + "c3 SMALLINT, " + + "c4 INTEGER, " + + "c5 BIGINT, " + + "c6 FLOAT, " + + "c7 DOUBLE, " + + "c8 DECIMAL(10, 5), " + + "c9 TIMESTAMP, " + + "c10 DATE, " + + "c11 TIME, " + + "c12 BLOB, " + + "c13 TINYBLOB, " + + "c14 BINARY(10), " + + "c15 MEDIUMTEXT, " + + "c16 TINYTEXT, " + + "c17 CHAR(10) " + + ")"); + } finally { + conn.close(); + } + } + + @Rule public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public TestPipeline pipelineRead = TestPipeline.create(); + + public static class ConstructRowFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + + Row.Builder rowBuilder = Row.withSchema(schema); + rowBuilder.addValue(Boolean.TRUE); + rowBuilder.addValue((byte) 10); + rowBuilder.addValue((short) 10); + rowBuilder.addValue(10); + rowBuilder.addValue((long) 10); + rowBuilder.addValue((float) 10.1); + rowBuilder.addValue(10.1); + rowBuilder.addValue(new BigDecimal("10.1")); + rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z")); + rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z")); + rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z")); + rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8)); + rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8)); + rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8)); + rowBuilder.addValue("asd"); + rowBuilder.addValue("asd"); + rowBuilder.addValue("asd"); + Row row = rowBuilder.build(); + + c.output(row); + } + } + + private PipelineResult runWrite() { + pipelineWrite + .apply(GenerateSequence.from(0).to(numberOfRows)) + .apply(ParDo.of(new ConstructRowFn())) + .setRowSchema(schema) + .apply( + SingleStoreIO.writeRows() + .withDataSourceConfiguration(dataSourceConfiguration) + .withTable(tableName)); + + return pipelineWrite.run(); + } + + private PipelineResult runRead() { + PCollection res = + pipelineRead.apply( + SingleStoreIO.readRows() + .withDataSourceConfiguration(dataSourceConfiguration) + .withTable(tableName)); + + PAssert.thatSingleton(res.apply("Count All", Count.globally())).isEqualTo((long) numberOfRows); + + res.apply(ParDo.of(new CheckDoFn())).setCoder(VoidCoder.of()); + + return pipelineRead.run(); + } + + private static class CheckDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext c) { + Row r = c.element(); + Assert.assertNotNull(r); + assertEquals(Boolean.TRUE, r.getBoolean(0)); + assertEquals((Byte) (byte) 10, r.getByte(1)); + assertEquals((Short) (short) 10, r.getInt16(2)); + assertEquals((Integer) 10, r.getInt32(3)); + assertEquals((Long) (long) 10, r.getInt64(4)); + assertEquals((Float) (float) 10.1, r.getFloat(5)); + assertEquals((Double) 10.1, r.getDouble(6)); + assertEquals(new BigDecimal("10.10000"), r.getDecimal(7)); + assertEquals(0, new DateTime("2022-01-01T10:10:10Z").compareTo(r.getDateTime(8))); + assertEquals(0, new DateTime("2022-01-01T00:00:00Z").compareTo(r.getDateTime(9))); + assertEquals(0, new DateTime("1970-01-01T10:10:10Z").compareTo(r.getDateTime(10))); + assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(11)); + assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(12)); + assertArrayEquals("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8), r.getBytes(13)); + assertEquals("asd", r.getString(14)); + assertEquals("asd", r.getString(15)); + assertEquals("asd", r.getString(16)); + } + } +} diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java similarity index 92% rename from sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java rename to sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java index cdbcacd04434..8adf8a47dc13 100644 --- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java @@ -22,9 +22,6 @@ import com.google.cloud.Timestamp; import com.singlestore.jdbc.SingleStoreDataSource; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -58,9 +55,9 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class SingleStoreIOIT { +public class SingleStoreIOPerformanceIT { - private static final String NAMESPACE = SingleStoreIOIT.class.getName(); + private static final String NAMESPACE = SingleStoreIOPerformanceIT.class.getName(); private static final String DATABASE_NAME = "SingleStoreIOIT"; @@ -109,22 +106,10 @@ public static void setup() { .get(); } - void createDatabaseIfNotExists() throws SQLException { - DataSource dataSource = - new SingleStoreDataSource( - String.format( - "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE", - serverName, port, username, password)); - try (Connection conn = dataSource.getConnection(); - Statement stmt = conn.createStatement()) { - stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_NAME)); - } - } - @Test @Category(NeedsRunner.class) public void testWriteThenRead() throws Exception { - createDatabaseIfNotExists(); + TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME); DataSource dataSource = new SingleStoreDataSource( String.format( diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java new file mode 100644 index 000000000000..4ded4cd452a8 --- /dev/null +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.singlestore; + +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.singlestore.jdbc.SingleStoreDataSource; +import java.util.List; +import javax.sql.DataSource; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadConfiguration; +import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadProvider; +import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteConfiguration; +import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteProvider; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Top; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SingleStoreIOSchemaTransformIT { + + private static final String DATABASE_NAME = "SingleStoreIOIT"; + + private static int numberOfRows; + + private static String tableName; + + private static String serverName; + + private static String username; + + private static String password; + + private static Integer port; + + private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration; + + @BeforeClass + public static void setup() { + SingleStoreIOTestPipelineOptions options; + try { + options = readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class); + } catch (IllegalArgumentException e) { + options = null; + } + org.junit.Assume.assumeNotNull(options); + + numberOfRows = options.getNumberOfRecords(); + serverName = options.getSingleStoreServerName(); + username = options.getSingleStoreUsername(); + password = options.getSingleStorePassword(); + port = options.getSingleStorePort(); + tableName = DatabaseTestHelper.getTestTableName("IT"); + dataSourceConfiguration = + SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port) + .withDatabase(DATABASE_NAME) + .withPassword(password) + .withUsername(username); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteThenRead() throws Exception { + TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME); + DataSource dataSource = + new SingleStoreDataSource( + String.format( + "jdbc:singlestore://%s:%d/%s?user=%s&password=%s&allowLocalInfile=TRUE", + serverName, port, DATABASE_NAME, username, password)); + DatabaseTestHelper.createTable(dataSource, tableName); + try { + PipelineResult writeResult = runWrite(); + assertEquals(PipelineResult.State.DONE, writeResult.waitUntilFinish()); + PipelineResult readResult = runRead(); + assertEquals(PipelineResult.State.DONE, readResult.waitUntilFinish()); + PipelineResult readResultWithPartitions = runReadWithPartitions(); + assertEquals(PipelineResult.State.DONE, readResultWithPartitions.waitUntilFinish()); + } finally { + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + } + + @Rule public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public TestPipeline pipelineRead = TestPipeline.create(); + @Rule public TestPipeline pipelineReadWithPartitions = TestPipeline.create(); + + private PipelineResult runWrite() { + SchemaTransformProvider provider = new SingleStoreSchemaTransformWriteProvider(); + + SingleStoreSchemaTransformWriteConfiguration configuration = + SingleStoreSchemaTransformWriteConfiguration.builder() + .setDataSourceConfiguration(dataSourceConfiguration) + .setTable(tableName) + .setBatchSize(100) + .build(); + + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PTransform pCollectionRowTupleTransform = + schemaTransform.buildTransform(); + + Schema.Builder schemaBuilder = new Schema.Builder(); + schemaBuilder.addField("id", Schema.FieldType.INT32); + schemaBuilder.addField("name", Schema.FieldType.STRING); + Schema schema = schemaBuilder.build(); + + PCollection rows = + pipelineWrite + .apply(GenerateSequence.from(0).to(numberOfRows)) + .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply( + "Convert TestRows to Rows", + MapElements.into(TypeDescriptor.of(Row.class)) + .via( + (SerializableFunction) + testRow -> { + Row.Builder rowBuilder = Row.withSchema(schema); + rowBuilder.addValue(testRow.id()); + rowBuilder.addValue(testRow.name()); + return rowBuilder.build(); + })) + .setRowSchema(schema); + + PCollectionRowTuple input = + PCollectionRowTuple.of(SingleStoreSchemaTransformWriteProvider.INPUT_TAG, rows); + String tag = provider.outputCollectionNames().get(0); + PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + assertTrue(output.has(tag)); + PCollection writtenRows = + output + .get(tag) + .apply( + "Convert Rows to Integers", + MapElements.into(TypeDescriptor.of(Integer.class)) + .via((SerializableFunction) row -> row.getInt32(0))); + + PAssert.thatSingleton(writtenRows.apply("Sum All", Sum.integersGlobally())) + .isEqualTo(numberOfRows); + + return pipelineWrite.run(); + } + + private PipelineResult runRead() { + SchemaTransformProvider provider = new SingleStoreSchemaTransformReadProvider(); + + SingleStoreSchemaTransformReadConfiguration configuration = + SingleStoreSchemaTransformReadConfiguration.builder() + .setDataSourceConfiguration(dataSourceConfiguration) + .setTable(tableName) + .setOutputParallelization(true) + .build(); + + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PTransform pCollectionRowTupleTransform = + schemaTransform.buildTransform(); + + PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineRead); + String tag = provider.outputCollectionNames().get(0); + PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + assertTrue(output.has(tag)); + PCollection namesAndIds = + output + .get(tag) + .apply( + MapElements.into(TypeDescriptor.of(TestRow.class)) + .via( + (SerializableFunction) + row -> TestRow.create(row.getInt32(0), row.getString(1)))); + + testReadResult(namesAndIds); + + return pipelineRead.run(); + } + + private PipelineResult runReadWithPartitions() { + SchemaTransformProvider provider = new SingleStoreSchemaTransformReadProvider(); + + SingleStoreSchemaTransformReadConfiguration configuration = + SingleStoreSchemaTransformReadConfiguration.builder() + .setDataSourceConfiguration(dataSourceConfiguration) + .setTable(tableName) + .setWithPartitions(true) + .build(); + + Row configurationRow = configuration.toBeamRow(); + SchemaTransform schemaTransform = provider.from(configurationRow); + PTransform pCollectionRowTupleTransform = + schemaTransform.buildTransform(); + + PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineReadWithPartitions); + String tag = provider.outputCollectionNames().get(0); + PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + assertTrue(output.has(tag)); + PCollection namesAndIds = + output + .get(tag) + .apply( + MapElements.into(TypeDescriptor.of(TestRow.class)) + .via( + (SerializableFunction) + row -> TestRow.create(row.getInt32(0), row.getString(1)))); + + testReadResult(namesAndIds); + + return pipelineReadWithPartitions.run(); + } + + private void testReadResult(PCollection namesAndIds) { + PAssert.thatSingleton(namesAndIds.apply("Count All", Count.globally())) + .isEqualTo((long) numberOfRows); + + PCollection consolidatedHashcode = + namesAndIds + .apply(ParDo.of(new TestRow.SelectNameFn())) + .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); + PAssert.that(consolidatedHashcode) + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows)); + + PCollection> frontOfList = namesAndIds.apply(Top.smallest(500)); + Iterable expectedFrontOfList = TestRow.getExpectedValues(0, 500); + PAssert.thatSingletonIterable(frontOfList).containsInAnyOrder(expectedFrontOfList); + + PCollection> backOfList = namesAndIds.apply(Top.largest(500)); + Iterable expectedBackOfList = + TestRow.getExpectedValues(numberOfRows - 500, numberOfRows); + PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList); + } +} diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java index 4b743e43cd1b..6f50d419368e 100644 --- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java @@ -79,6 +79,23 @@ public TestRow mapRow(ResultSet resultSet) throws Exception { } } + private static class TestRowMapperWithCoder extends TestRowMapper + implements SingleStoreIO.RowMapperWithCoder { + @Override + public Coder getCoder() throws Exception { + return SerializableCoder.of(TestRow.class); + } + } + + @Test + public void testInferCoderFromRowMapper() { + SchemaRegistry sr = SchemaRegistry.createDefault(); + CoderRegistry cr = CoderRegistry.createDefault(); + Coder c = SerializableCoder.of(TestRow.class); + + assertEquals(c, SingleStoreUtil.inferCoder(new TestRowMapperWithCoder(), cr, sr, LOG)); + } + @Test public void testInferCoderFromSchemaRegistry() { SchemaRegistry sr = SchemaRegistry.createDefault(); diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java index 7c0acc539098..e04785db4ee3 100644 --- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java @@ -17,9 +17,14 @@ */ package org.apache.beam.sdk.io.singlestore; +import com.singlestore.jdbc.SingleStoreDataSource; +import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import javax.sql.DataSource; import org.apache.beam.sdk.io.common.TestRow; public class TestHelper { @@ -67,4 +72,18 @@ String getPassword() { return "secretPass"; } } + + public static void createDatabaseIfNotExists( + String serverName, Integer port, String username, String password, String databaseName) + throws SQLException { + DataSource dataSource = + new SingleStoreDataSource( + String.format( + "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE", + serverName, port, username, password)); + try (Connection conn = dataSource.getConnection(); + Statement stmt = conn.createStatement()) { + stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName)); + } + } }