Skip to content

Commit 175636b

Browse files
authored
Frame writers: Coerce numeric and array types in certain cases. (#16994)
This patch adds "TypeCastSelectors", which is used when writing frames to perform two coercions: - When a numeric type is desired and the underlying type is non-numeric or unknown, the underlying selector is wrapped, "getObject" is called and the result is coerced using "ExprEval.ofType". This differs from the prior behavior where the primitive methods like "getLong", "getDouble", etc, would be called directly. This fixes an issue where a column would be read as all-zeroes when its SQL type is numeric and its physical type is string, which can happen when evolving a column's type from string to number. - When an array type is desired, the underlying selector is wrapped, "getObject" is called, and the result is coerced to Object[]. This coercion replaces some earlier logic from #15917.
1 parent c49dc83 commit 175636b

File tree

15 files changed

+835
-117
lines changed

15 files changed

+835
-117
lines changed

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public GroupByPostShuffleFrameProcessor(
109109
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
110110
query,
111111
() -> outputRow,
112-
RowSignature.Finalization.YES
112+
GroupByQueryKit.isFinalize(query) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO
113113
)
114114
);
115115
}

integration-tests-ex/cases/src/test/resources/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"expectedResults": [
55
{
66
"__time": 1672058096000,
7-
"double_col": 0.0
7+
"double_col": null
88
}
99
]
1010
}

processing/src/main/java/org/apache/druid/frame/field/FieldWriters.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.druid.frame.key.RowKey;
2323
import org.apache.druid.frame.write.RowBasedFrameWriterFactory;
2424
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
25+
import org.apache.druid.frame.write.cast.TypeCastSelectors;
2526
import org.apache.druid.java.util.common.ISE;
2627
import org.apache.druid.query.dimension.DefaultDimensionSpec;
2728
import org.apache.druid.segment.ColumnSelectorFactory;
@@ -101,7 +102,8 @@ private static FieldWriter makeLongWriter(
101102
final String columnName
102103
)
103104
{
104-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
105+
final ColumnValueSelector<?> selector =
106+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.LONG);
105107
return LongFieldWriter.forPrimitive(selector);
106108
}
107109

@@ -110,7 +112,8 @@ private static FieldWriter makeFloatWriter(
110112
final String columnName
111113
)
112114
{
113-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
115+
final ColumnValueSelector<?> selector =
116+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.FLOAT);
114117
return FloatFieldWriter.forPrimitive(selector);
115118
}
116119

@@ -119,7 +122,8 @@ private static FieldWriter makeDoubleWriter(
119122
final String columnName
120123
)
121124
{
122-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
125+
final ColumnValueSelector<?> selector =
126+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.DOUBLE);
123127
return DoubleFieldWriter.forPrimitive(selector);
124128
}
125129

@@ -139,7 +143,8 @@ private static FieldWriter makeStringArrayWriter(
139143
final boolean removeNullBytes
140144
)
141145
{
142-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
146+
final ColumnValueSelector<?> selector =
147+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.STRING_ARRAY);
143148
return new StringArrayFieldWriter(selector, removeNullBytes);
144149
}
145150

@@ -148,7 +153,8 @@ private static FieldWriter makeLongArrayWriter(
148153
final String columnName
149154
)
150155
{
151-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
156+
final ColumnValueSelector<?> selector =
157+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.LONG_ARRAY);
152158
return NumericArrayFieldWriter.getLongArrayFieldWriter(selector);
153159
}
154160

@@ -157,7 +163,8 @@ private static FieldWriter makeFloatArrayWriter(
157163
final String columnName
158164
)
159165
{
160-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
166+
final ColumnValueSelector<?> selector =
167+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.FLOAT_ARRAY);
161168
return NumericArrayFieldWriter.getFloatArrayFieldWriter(selector);
162169
}
163170

@@ -166,7 +173,8 @@ private static FieldWriter makeDoubleArrayWriter(
166173
final String columnName
167174
)
168175
{
169-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
176+
final ColumnValueSelector<?> selector =
177+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.DOUBLE_ARRAY);
170178
return NumericArrayFieldWriter.getDoubleArrayFieldWriter(selector);
171179
}
172180

@@ -185,7 +193,8 @@ private static FieldWriter makeComplexWriter(
185193
throw new ISE("No serde for complexTypeName[%s], cannot write column [%s]", columnTypeName, columnName);
186194
}
187195

188-
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
196+
final ColumnValueSelector<?> selector =
197+
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.ofComplex(columnTypeName));
189198
return new ComplexFieldWriter(serde, selector);
190199
}
191200
}

processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldSelector.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*
4040
* @param <ElementType> Type of the individual array elements
4141
*/
42-
public abstract class NumericArrayFieldSelector<ElementType extends Number> implements ColumnValueSelector
42+
public abstract class NumericArrayFieldSelector<ElementType extends Number> implements ColumnValueSelector<Object[]>
4343
{
4444
/**
4545
* Memory containing the serialized values of the array
@@ -81,15 +81,15 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
8181

8282
@Nullable
8383
@Override
84-
public Object getObject()
84+
public Object[] getObject()
8585
{
8686
return computeCurrentArray();
8787
}
8888

8989
@Override
90-
public Class classOfObject()
90+
public Class<Object[]> classOfObject()
9191
{
92-
return Object.class;
92+
return Object[].class;
9393
}
9494

9595
@Override
@@ -131,7 +131,7 @@ public boolean isNull()
131131
public abstract int getIndividualFieldSize();
132132

133133
@Nullable
134-
private Number[] computeCurrentArray()
134+
private Object[] computeCurrentArray()
135135
{
136136
final long fieldPosition = fieldPointer.position();
137137
final long fieldLength = fieldPointer.length();

processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldWriter.java

+4-18
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121

2222
import org.apache.datasketches.memory.WritableMemory;
2323
import org.apache.druid.common.config.NullHandling;
24-
import org.apache.druid.frame.write.FrameWriterUtils;
2524
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
2625
import org.apache.druid.segment.ColumnValueSelector;
2726

2827
import javax.annotation.Nullable;
29-
import java.util.List;
3028
import java.util.concurrent.atomic.AtomicInteger;
3129

3230
/**
@@ -128,7 +126,7 @@ public NumericArrayFieldWriter(final ColumnValueSelector selector, NumericFieldW
128126
@Override
129127
public long writeTo(WritableMemory memory, long position, long maxSize)
130128
{
131-
Object row = selector.getObject();
129+
final Object[] row = (Object[]) selector.getObject();
132130
if (row == null) {
133131
int requiredSize = Byte.BYTES;
134132
if (requiredSize > maxSize) {
@@ -137,18 +135,6 @@ public long writeTo(WritableMemory memory, long position, long maxSize)
137135
memory.putByte(position, NULL_ROW);
138136
return requiredSize;
139137
} else {
140-
141-
List<? extends Number> list = FrameWriterUtils.getNumericArrayFromObject(row);
142-
143-
if (list == null) {
144-
int requiredSize = Byte.BYTES;
145-
if (requiredSize > maxSize) {
146-
return -1;
147-
}
148-
memory.putByte(position, NULL_ROW);
149-
return requiredSize;
150-
}
151-
152138
// Create a columnValueSelector to write the individual elements re-using the NumericFieldWriter
153139
AtomicInteger index = new AtomicInteger(0);
154140
ColumnValueSelector<Number> columnValueSelector = new ColumnValueSelector<Number>()
@@ -199,7 +185,7 @@ public boolean isNull()
199185
@Override
200186
public Number getObject()
201187
{
202-
return list.get(index.get());
188+
return (Number) row[index.get()];
203189
}
204190

205191
@Override
@@ -215,7 +201,7 @@ public Class<? extends Number> classOfObject()
215201
// Next [(1 + Numeric Size) x Number of elements of array] bytes are reserved for the elements of the array and
216202
// their null markers
217203
// Last byte is reserved for array termination
218-
int requiredSize = Byte.BYTES + (writer.getNumericSizeBytes() + Byte.BYTES) * list.size() + Byte.BYTES;
204+
int requiredSize = Byte.BYTES + (writer.getNumericSizeBytes() + Byte.BYTES) * row.length + Byte.BYTES;
219205

220206
if (requiredSize > maxSize) {
221207
return -1;
@@ -225,7 +211,7 @@ public Class<? extends Number> classOfObject()
225211
memory.putByte(position + offset, NON_NULL_ROW);
226212
offset += Byte.BYTES;
227213

228-
for (; index.get() < list.size(); index.incrementAndGet()) {
214+
for (; index.get() < row.length; index.incrementAndGet()) {
229215
writer.writeTo(
230216
memory,
231217
position + offset,

processing/src/main/java/org/apache/druid/frame/write/FrameWriterUtils.java

+6-49
Original file line numberDiff line numberDiff line change
@@ -144,60 +144,16 @@ public static List<ByteBuffer> getUtf8ByteBuffersFromStringArraySelector(
144144
@SuppressWarnings("rawtypes") final BaseObjectColumnValueSelector selector
145145
)
146146
{
147-
Object row = selector.getObject();
147+
final Object[] row = (Object[]) selector.getObject();
148148
if (row == null) {
149149
return null;
150-
} else if (row instanceof String) {
151-
return Collections.singletonList(getUtf8ByteBufferFromString((String) row));
152-
}
153-
154-
final List<ByteBuffer> retVal = new ArrayList<>();
155-
if (row instanceof List) {
156-
for (int i = 0; i < ((List<?>) row).size(); i++) {
157-
retVal.add(getUtf8ByteBufferFromString(((List<String>) row).get(i)));
158-
}
159-
} else if (row instanceof Object[]) {
160-
for (Object value : (Object[]) row) {
161-
retVal.add(getUtf8ByteBufferFromString((String) value));
162-
}
163150
} else {
164-
throw new ISE("Unexpected type %s found", row.getClass().getName());
165-
}
166-
return retVal;
167-
}
168-
169-
/**
170-
* Retrieves a numeric list from a Java object, given that the object is an instance of something that can be returned
171-
* from {@link ColumnValueSelector#getObject()} of valid numeric array selectors representations
172-
*
173-
* While {@link BaseObjectColumnValueSelector} specifies that only instances of {@code Object[]} can be returned from
174-
* the numeric array selectors, this method also handles a few more cases which can be encountered if the selector is
175-
* directly implemented on top of the group by stuff
176-
*/
177-
@Nullable
178-
public static List<? extends Number> getNumericArrayFromObject(Object row)
179-
{
180-
if (row == null) {
181-
return null;
182-
} else if (row instanceof Number) {
183-
return Collections.singletonList((Number) row);
184-
}
185-
186-
final List<Number> retVal = new ArrayList<>();
187-
188-
if (row instanceof List) {
189-
for (int i = 0; i < ((List<?>) row).size(); i++) {
190-
retVal.add((Number) ((List<?>) row).get(i));
191-
}
192-
} else if (row instanceof Object[]) {
193-
for (Object value : (Object[]) row) {
194-
retVal.add((Number) value);
151+
final List<ByteBuffer> retVal = new ArrayList<>();
152+
for (Object value : row) {
153+
retVal.add(getUtf8ByteBufferFromString((String) value));
195154
}
196-
} else {
197-
throw new ISE("Unexpected type %s found", row.getClass().getName());
155+
return retVal;
198156
}
199-
200-
return retVal;
201157
}
202158

203159
/**
@@ -275,6 +231,7 @@ public static void copyByteBufferToMemoryDisallowingNullBytes(
275231
* Whenever "allowNullBytes" is true, "removeNullBytes" must be false. Use the methods {@link #copyByteBufferToMemoryAllowingNullBytes}
276232
* and {@link #copyByteBufferToMemoryDisallowingNullBytes} to copy between the memory
277233
* <p>
234+
*
278235
* @throws InvalidNullByteException if "allowNullBytes" and "removeNullBytes" is false and a null byte is encountered
279236
*/
280237
private static void copyByteBufferToMemory(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.frame.write.cast;
21+
22+
import org.apache.druid.error.DruidException;
23+
import org.apache.druid.math.expr.ExprEval;
24+
import org.apache.druid.math.expr.ExpressionType;
25+
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
26+
import org.apache.druid.segment.ColumnValueSelector;
27+
import org.apache.druid.segment.RowIdSupplier;
28+
29+
import javax.annotation.Nullable;
30+
31+
/**
32+
* Wraps a {@link ColumnValueSelector}, calls {@link ColumnValueSelector#getObject()}, interprets that value using
33+
* {@link ExprEval#ofType}, and casts it using {@link ExprEval#castTo}.
34+
*/
35+
public class ObjectToArrayColumnValueSelector implements ColumnValueSelector<Object[]>
36+
{
37+
private final ColumnValueSelector<?> selector;
38+
@Nullable
39+
private final ExpressionType desiredType;
40+
@Nullable
41+
private final RowIdSupplier rowIdSupplier;
42+
43+
public ObjectToArrayColumnValueSelector(
44+
final ColumnValueSelector<?> selector,
45+
final ExpressionType desiredType,
46+
@Nullable final RowIdSupplier rowIdSupplier
47+
)
48+
{
49+
this.selector = selector;
50+
this.desiredType = desiredType;
51+
this.rowIdSupplier = rowIdSupplier;
52+
53+
if (!desiredType.isArray() || desiredType.getElementType() == null) {
54+
throw DruidException.defensive("Expected array with nonnull element type, got[%s]", desiredType);
55+
}
56+
}
57+
58+
@Override
59+
public double getDouble()
60+
{
61+
throw DruidException.defensive("Unexpected call to getDouble on array selector");
62+
}
63+
64+
@Override
65+
public float getFloat()
66+
{
67+
throw DruidException.defensive("Unexpected call to getFloat on array selector");
68+
}
69+
70+
@Override
71+
public long getLong()
72+
{
73+
throw DruidException.defensive("Unexpected call to getLong on array selector");
74+
}
75+
76+
@Override
77+
public boolean isNull()
78+
{
79+
throw DruidException.defensive("Unexpected call to isNull on array selector");
80+
}
81+
82+
@Nullable
83+
@Override
84+
public Object[] getObject()
85+
{
86+
return (Object[]) TypeCastSelectors.bestEffortCoerce(selector.getObject(), desiredType);
87+
}
88+
89+
@Override
90+
public Class<Object[]> classOfObject()
91+
{
92+
return Object[].class;
93+
}
94+
95+
@Override
96+
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
97+
{
98+
inspector.visit("selector", selector);
99+
inspector.visit("rowIdSupplier", rowIdSupplier);
100+
}
101+
}

0 commit comments

Comments
 (0)