From 6ae639196440142fb2a05269f946f719a8fb08a7 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Thu, 13 Feb 2025 14:31:32 +0800 Subject: [PATCH 1/4] Support Filter and Project pushdown Signed-off-by: Heng Qian --- .../sql/calcite/plan/OpenSearchTable.java | 5 +- .../request/OpenSearchRequestBuilder.java | 8 +- .../opensearch/request/PredicateAnalyzer.java | 1076 +++++++++++++++++ .../opensearch/storage/OpenSearchIndex.java | 45 +- .../scan/OpenSearchIndexEnumerator.java | 4 +- 5 files changed, 1128 insertions(+), 10 deletions(-) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java index 64f7ebc8e5..ac54ffa46e 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java @@ -16,13 +16,14 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ProjectableFilterableTable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.TranslatableTable; import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes; public abstract class OpenSearchTable extends AbstractQueryableTable - implements TranslatableTable, org.opensearch.sql.storage.Table { + implements ProjectableFilterableTable, TranslatableTable, org.opensearch.sql.storage.Table { protected OpenSearchTable(Type elementType) { super(elementType); @@ -55,5 +56,5 @@ public Expression getExpression(SchemaPlus schema, String tableName, Class clazz return Schemas.tableExpression(schema, getElementType(), tableName, clazz); } - public abstract Enumerable search(); + public abstract Enumerable search(); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index 6fa9b17697..3b333629da 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -280,9 +280,11 @@ public void pushDownHighlight(String field, Map arguments) { /** Push down project list to DSL requests. */ public void pushDownProjects(Set projects) { - sourceBuilder.fetchSource( - projects.stream().map(ReferenceExpression::getAttr).distinct().toArray(String[]::new), - new String[0]); + pushDownProjectStream(projects.stream().map(ReferenceExpression::getAttr)); + } + + public void pushDownProjectStream(Stream projects) { + sourceBuilder.fetchSource(projects.distinct().toArray(String[]::new), new String[0]); } public void pushTypeMapping(Map typeMapping) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java new file mode 100644 index 0000000000..34dd969ff8 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -0,0 +1,1076 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.opensearch.sql.opensearch.request; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.index.query.QueryBuilders.existsQuery; +import static org.opensearch.index.query.QueryBuilders.matchQuery; +import static org.opensearch.index.query.QueryBuilders.rangeQuery; +import static org.opensearch.index.query.QueryBuilders.regexpQuery; +import static org.opensearch.index.query.QueryBuilders.termQuery; +import static org.opensearch.index.query.QueryBuilders.termsQuery; + +import com.google.common.base.Throwables; +import com.google.common.collect.Range; +import java.util.ArrayList; +import java.util.GregorianCalendar; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Sarg; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.sql.calcite.plan.OpenSearchConstants; + +/** + * Query predicate analyzer. Uses visitor pattern to traverse existing expression + * and convert it to {@link QueryBuilder} + */ +public class PredicateAnalyzer { + + /** + * Internal exception. + */ + @SuppressWarnings("serial") + private static final class PredicateAnalyzerException extends RuntimeException { + + PredicateAnalyzerException(String message) { + super(message); + } + + PredicateAnalyzerException(Throwable cause) { + super(cause); + } + } + + /** + * Exception that is thrown when a {@link org.apache.calcite.rel.RelNode} + * expression cannot be processed (or converted into an Elasticsearch query). + */ + public static class ExpressionNotAnalyzableException extends Exception { + ExpressionNotAnalyzableException(String message, Throwable cause) { + super(message, cause); + } + } + + private PredicateAnalyzer() {} + + /** + * Walks the expression tree, attempting to convert the entire tree into + * an equivalent OpenSearch query filter. If an error occurs, or if it + * is determined that the expression cannot be converted, an exception is + * thrown and an error message logged. + * + *

Callers should catch ExpressionNotAnalyzableException + * and fall back to not using push-down filters. + * + * @param expression expression to analyze + * @return search query which can be used to query OS cluster + * @throws ExpressionNotAnalyzableException when expression can't processed by this analyzer + */ + public static QueryBuilder analyze(RexNode expression, List mapping) throws ExpressionNotAnalyzableException { + requireNonNull(expression, "expression"); + try { + // visits expression tree + QueryExpression e = (QueryExpression) expression.accept(new Visitor(mapping)); + + if (e != null && e.isPartial()) { + throw new UnsupportedOperationException("Can't handle partial QueryExpression: " + e); + } + return e != null ? e.builder() : null; + } catch (Throwable e) { + Throwables.throwIfInstanceOf(e, UnsupportedOperationException.class); + throw new ExpressionNotAnalyzableException("Can't convert " + expression, e); + } + } + + /** + * Traverses {@link RexNode} tree and builds ES query. + */ + private static class Visitor extends RexVisitorImpl { + + List mapping; + private Visitor(List mapping) { + super(true); + this.mapping = mapping; + } + + @Override public Expression visitInputRef(RexInputRef inputRef) { + return new NamedFieldExpression(inputRef, mapping); + } + + @Override public Expression visitLiteral(RexLiteral literal) { + return new LiteralExpression(literal); + } + + private static boolean supportedRexCall(RexCall call) { + final SqlSyntax syntax = call.getOperator().getSyntax(); + switch (syntax) { + case BINARY: + switch (call.getKind()) { + case CONTAINS: + case AND: + case OR: + case LIKE: + case EQUALS: + case NOT_EQUALS: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + return true; + default: + return false; + } + case SPECIAL: + switch (call.getKind()) { + case CAST: + case LIKE: + case ITEM: + case OTHER_FUNCTION: + return true; + case CASE: + case SIMILAR: + default: + return false; + } + case FUNCTION: + return true; + case POSTFIX: + switch (call.getKind()) { + case IS_NOT_NULL: + case IS_NULL: + return true; + default: + return false; + } + case PREFIX: // NOT() + switch (call.getKind()) { + case NOT: + return true; + default: + return false; + } + case INTERNAL: + switch (call.getKind()) { + case SEARCH: + return canBeTranslatedToTermsQuery(call); + default: + return false; + } + case FUNCTION_ID: + case FUNCTION_STAR: + default: + return false; + } + } + + /** + * There are three types of the Sarg included in SEARCH RexCall: + * 1) Sarg is points (In ('a', 'b', 'c' ...)). + * In this case the search call can be translated to terms Query + * 2) Sarg is complementedPoints (Not in ('a', 'b')). + * In this case the search call can be translated to MustNot terms Query + * 3) Sarg is real Range( > 1 and <= 10). + * In this case the search call should be translated to rang Query + * Currently only the 1) and 2) cases are supported. + * + * @param search SEARCH RexCall + * @return true if it isSearchWithPoints or isSearchWithComplementedPoints, other false + */ + static boolean canBeTranslatedToTermsQuery(RexCall search) { + return isSearchWithPoints(search) || isSearchWithComplementedPoints(search); + } + + static boolean isSearchWithPoints(RexCall search) { + RexLiteral literal = (RexLiteral) search.getOperands().get(1); + final Sarg sarg = requireNonNull(literal.getValueAs(Sarg.class), "Sarg"); + return sarg.isPoints(); + } + + static boolean isSearchWithComplementedPoints(RexCall search) { + RexLiteral literal = (RexLiteral) search.getOperands().get(1); + final Sarg sarg = requireNonNull(literal.getValueAs(Sarg.class), "Sarg"); + return sarg.isComplementedPoints(); + } + + @Override public Expression visitCall(RexCall call) { + + SqlSyntax syntax = call.getOperator().getSyntax(); + if (!supportedRexCall(call)) { + String message = String.format(Locale.ROOT, "Unsupported call: [%s]", call); + throw new PredicateAnalyzerException(message); + } + + switch (syntax) { + case BINARY, INTERNAL: + return binary(call); + case POSTFIX: + return postfix(call); + case PREFIX: + return prefix(call); + case SPECIAL: + return switch (call.getKind()) { + case CAST -> toCastExpression(call); + case LIKE, CONTAINS -> binary(call); + default -> { + String message = String.format(Locale.ROOT, "Unsupported call: [%s]", call); + throw new PredicateAnalyzerException(message); + } + }; + case FUNCTION: + if (call.getOperator().getName().equalsIgnoreCase("CONTAINS")) { + List operands = visitList(call.getOperands()); + String query = + convertQueryString(operands.subList(0, operands.size() - 1), + operands.get(operands.size() - 1)); + return QueryExpression.create(new NamedFieldExpression()).queryString(query); + } + // fall through + default: + String message = + format(Locale.ROOT, + "Unsupported syntax [%s] for call: [%s]", syntax, call); + throw new PredicateAnalyzerException(message); + } + } + + private static String convertQueryString(List fields, Expression query) { + int index = 0; + checkArgument(query instanceof LiteralExpression, + "Query string must be a string literal"); + String queryString = ((LiteralExpression) query).stringValue(); + @SuppressWarnings("ModifiedButNotUsed") + Map fieldMap = new LinkedHashMap<>(); + for (Expression expr : fields) { + if (expr instanceof NamedFieldExpression) { + NamedFieldExpression field = (NamedFieldExpression) expr; + String fieldIndexString = String.format(Locale.ROOT, "$%d", index++); + fieldMap.put(fieldIndexString, field.getReference()); + } + } + try { + return queryString; + } catch (Exception e) { + throw new PredicateAnalyzerException(e); + } + } + + private QueryExpression prefix(RexCall call) { + checkArgument(call.getKind() == SqlKind.NOT, + "Expected %s got %s", SqlKind.NOT, call.getKind()); + + if (call.getOperands().size() != 1) { + String message = String.format(Locale.ROOT, "Unsupported NOT operator: [%s]", call); + throw new PredicateAnalyzerException(message); + } + + QueryExpression expr = (QueryExpression) call.getOperands().get(0).accept(this); + return expr.not(); + } + + private QueryExpression postfix(RexCall call) { + checkArgument(call.getKind() == SqlKind.IS_NULL + || call.getKind() == SqlKind.IS_NOT_NULL); + if (call.getOperands().size() != 1) { + String message = String.format(Locale.ROOT, "Unsupported operator: [%s]", call); + throw new PredicateAnalyzerException(message); + } + Expression a = call.getOperands().get(0).accept(this); + // Elasticsearch does not want is null/is not null (exists query) + // for _id and _index, although it supports for all other metadata column + isColumn(a, call, OpenSearchConstants.METADATA_FIELD_ID, true); + isColumn(a, call, OpenSearchConstants.METADATA_FIELD_INDEX, true); + QueryExpression operand = QueryExpression.create((TerminalExpression) a); + return call.getKind() == SqlKind.IS_NOT_NULL ? operand.exists() : operand.notExists(); + } + + /** + * Process a call which is a binary operation, transforming into an equivalent + * query expression. Note that the incoming call may be either a simple binary + * expression, such as {@code foo > 5}, or it may be several simple expressions connected + * by {@code AND} or {@code OR} operators, such as {@code foo > 5 AND bar = 'abc' AND 'rot' < 1} + * + * @param call existing call + * @return evaluated expression + */ + private QueryExpression binary(RexCall call) { + + // if AND/OR, do special handling + if (call.getKind() == SqlKind.AND || call.getKind() == SqlKind.OR) { + return andOr(call); + } + + checkForIncompatibleDateTimeOperands(call); + + checkState(call.getOperands().size() == 2); + final Expression a = call.getOperands().get(0).accept(this); + final Expression b = call.getOperands().get(1).accept(this); + + final SwapResult pair = swap(a, b); + final boolean swapped = pair.isSwapped(); + + // For _id and _index columns, only equals/not_equals work! + if (isColumn(pair.getKey(), call, OpenSearchConstants.METADATA_FIELD_ID, false) + || isColumn(pair.getKey(), call, OpenSearchConstants.METADATA_FIELD_INDEX, false) + || isColumn(pair.getKey(), call, OpenSearchConstants.METADATA_FIELD_UID, false)) { + switch (call.getKind()) { + case EQUALS: + case NOT_EQUALS: + break; + default: + throw new PredicateAnalyzerException( + "Cannot handle " + call.getKind() + " expression for _id field, " + call); + } + } + + switch (call.getKind()) { + case CONTAINS: + return QueryExpression.create(pair.getKey()).contains(pair.getValue()); + case LIKE: + throw new UnsupportedOperationException("LIKE not yet supported"); + case EQUALS: + return QueryExpression.create(pair.getKey()).equals(pair.getValue()); + case NOT_EQUALS: + return QueryExpression.create(pair.getKey()).notEquals(pair.getValue()); + case GREATER_THAN: + if (swapped) { + return QueryExpression.create(pair.getKey()).lt(pair.getValue()); + } + return QueryExpression.create(pair.getKey()).gt(pair.getValue()); + case GREATER_THAN_OR_EQUAL: + if (swapped) { + return QueryExpression.create(pair.getKey()).lte(pair.getValue()); + } + return QueryExpression.create(pair.getKey()).gte(pair.getValue()); + case LESS_THAN: + if (swapped) { + return QueryExpression.create(pair.getKey()).gt(pair.getValue()); + } + return QueryExpression.create(pair.getKey()).lt(pair.getValue()); + case LESS_THAN_OR_EQUAL: + if (swapped) { + return QueryExpression.create(pair.getKey()).gte(pair.getValue()); + } + return QueryExpression.create(pair.getKey()).lte(pair.getValue()); + case SEARCH: + if (isSearchWithComplementedPoints(call)) { + return QueryExpression.create(pair.getKey()).notIn(pair.getValue()); + } else { + return QueryExpression.create(pair.getKey()).in(pair.getValue()); + } + default: + break; + } + String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + throw new PredicateAnalyzerException(message); + } + + private QueryExpression andOr(RexCall call) { + QueryExpression[] expressions = new QueryExpression[call.getOperands().size()]; + PredicateAnalyzerException firstError = null; + boolean partial = false; + for (int i = 0; i < call.getOperands().size(); i++) { + try { + Expression expr = call.getOperands().get(i).accept(this); + if (expr instanceof NamedFieldExpression) { + // nop currently + } else { + expressions[i] = (QueryExpression) call.getOperands().get(i).accept(this); + } + partial |= expressions[i].isPartial(); + } catch (PredicateAnalyzerException e) { + if (firstError == null) { + firstError = e; + } + partial = true; + } + } + + switch (call.getKind()) { + case OR: + if (partial) { + if (firstError != null) { + throw firstError; + } else { + final String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + throw new PredicateAnalyzerException(message); + } + } + return CompoundQueryExpression.or(expressions); + case AND: + return CompoundQueryExpression.and(partial, expressions); + default: + String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + throw new PredicateAnalyzerException(message); + } + } + + /** + * Holder class for a pair of expressions. Used to convert {@code 1 = foo} into {@code foo = 1} + */ + private static class SwapResult { + final boolean swapped; + final TerminalExpression terminal; + final LiteralExpression literal; + + SwapResult(boolean swapped, TerminalExpression terminal, LiteralExpression literal) { + super(); + this.swapped = swapped; + this.terminal = terminal; + this.literal = literal; + } + + TerminalExpression getKey() { + return terminal; + } + + LiteralExpression getValue() { + return literal; + } + + boolean isSwapped() { + return swapped; + } + } + + /** + * Swap order of operands such that the literal expression is always on the right. + * + *

NOTE: Some combinations of operands are implicitly not supported and will + * cause an exception to be thrown. For example, we currently do not support + * comparing a literal to another literal as convention {@code 5 = 5}. Nor do we support + * comparing named fields to other named fields as convention {@code $0 = $1}. + * + * @param left left expression + * @param right right expression + */ + private static SwapResult swap(Expression left, Expression right) { + + TerminalExpression terminal; + LiteralExpression literal = expressAsLiteral(left); + boolean swapped = false; + if (literal != null) { + swapped = true; + terminal = (TerminalExpression) right; + } else { + literal = expressAsLiteral(right); + terminal = (TerminalExpression) left; + } + + if (literal == null || terminal == null) { + String message = + String.format(Locale.ROOT, + "Unexpected combination of expressions [left: %s] [right: %s]", + left, right); + throw new PredicateAnalyzerException(message); + } + + if (CastExpression.isCastExpression(terminal)) { + terminal = CastExpression.unpack(terminal); + } + + return new SwapResult(swapped, terminal, literal); + } + + private CastExpression toCastExpression(RexCall call) { + TerminalExpression argument = (TerminalExpression) call.getOperands().get(0).accept(this); + return new CastExpression(call.getType(), argument); + } + + private static NamedFieldExpression toNamedField(RexLiteral literal) { + return new NamedFieldExpression(literal); + } + + /** + * Try to convert a generic expression into a literal expression. + */ + private static LiteralExpression expressAsLiteral(Expression exp) { + + if (exp instanceof LiteralExpression) { + return (LiteralExpression) exp; + } + + return null; + } + + private static boolean isColumn(Expression exp, RexNode node, + String columnName, boolean throwException) { + if (!(exp instanceof NamedFieldExpression)) { + return false; + } + + final NamedFieldExpression termExp = (NamedFieldExpression) exp; + if (columnName.equals(termExp.getRootName())) { + if (throwException) { + throw new PredicateAnalyzerException("Cannot handle _id field in " + node); + } + return true; + } + return false; + } + } + + /** + * Empty interface; exists only to define the type hierarchy. + */ + interface Expression { + } + + /** + * Main expression operators (like {@code equals}, {@code gt}, {@code exists} etc.) + */ + abstract static class QueryExpression implements Expression { + + public abstract QueryBuilder builder(); + + public boolean isPartial() { + return false; + } + + public abstract QueryExpression contains(LiteralExpression literal); + + /** + * Negate {@code this} QueryExpression (not the next one). + */ + public abstract QueryExpression not(); + + public abstract QueryExpression exists(); + + public abstract QueryExpression notExists(); + + public abstract QueryExpression like(LiteralExpression literal); + + public abstract QueryExpression notLike(LiteralExpression literal); + + public abstract QueryExpression equals(LiteralExpression literal); + + public abstract QueryExpression in(LiteralExpression literal); + + public abstract QueryExpression notIn(LiteralExpression literal); + + public abstract QueryExpression notEquals(LiteralExpression literal); + + public abstract QueryExpression gt(LiteralExpression literal); + + public abstract QueryExpression gte(LiteralExpression literal); + + public abstract QueryExpression lt(LiteralExpression literal); + + public abstract QueryExpression lte(LiteralExpression literal); + + public abstract QueryExpression queryString(String query); + + public abstract QueryExpression isTrue(); + + public static QueryExpression create(TerminalExpression expression) { + if (expression instanceof CastExpression) { + expression = CastExpression.unpack(expression); + } + + if (expression instanceof NamedFieldExpression) { + return new SimpleQueryExpression((NamedFieldExpression) expression); + } else { + String message = String.format(Locale.ROOT, "Unsupported expression: [%s]", expression); + throw new PredicateAnalyzerException(message); + } + } + + } + + /** + * Builds conjunctions / disjunctions based on existing expressions. + */ + static class CompoundQueryExpression extends QueryExpression { + + private final boolean partial; + private final BoolQueryBuilder builder; + + public static CompoundQueryExpression or(QueryExpression... expressions) { + CompoundQueryExpression bqe = new CompoundQueryExpression(false); + for (QueryExpression expression : expressions) { + bqe.builder.should(expression.builder()); + } + return bqe; + } + + /** + * If partial expression, we will need to complete it with a full filter. + * + * @param partial whether we partially converted a and for push down purposes + * @param expressions list of expressions to join with {@code and} boolean + * @return new instance of expression + */ + public static CompoundQueryExpression and(boolean partial, QueryExpression... expressions) { + CompoundQueryExpression bqe = new CompoundQueryExpression(partial); + for (QueryExpression expression : expressions) { + if (expression != null) { // partial expressions have nulls for missing nodes + bqe.builder.must(expression.builder()); + } + } + return bqe; + } + + private CompoundQueryExpression(boolean partial) { + this(partial, boolQuery()); + } + + private CompoundQueryExpression(boolean partial, BoolQueryBuilder builder) { + this.partial = partial; + this.builder = requireNonNull(builder, "builder"); + } + + @Override public boolean isPartial() { + return partial; + } + + + @Override public QueryBuilder builder() { + return builder; + } + + @Override public QueryExpression not() { + return new CompoundQueryExpression(partial, boolQuery().mustNot(builder())); + } + + @Override public QueryExpression exists() { + throw new PredicateAnalyzerException("SqlOperatorImpl ['exists'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression contains(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['contains'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression notExists() { + throw new PredicateAnalyzerException("SqlOperatorImpl ['notExists'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression like(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['like'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression notLike(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['notLike'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression equals(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['='] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression notEquals(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['not'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression gt(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['>'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression gte(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['>='] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression lt(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['<'] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression lte(LiteralExpression literal) { + throw new PredicateAnalyzerException("SqlOperatorImpl ['<='] " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression queryString(String query) { + throw new PredicateAnalyzerException("QueryString " + + "cannot be applied to a compound expression"); + } + + @Override public QueryExpression isTrue() { + throw new PredicateAnalyzerException("isTrue cannot be applied to a compound expression"); + } + + @Override public QueryExpression in(LiteralExpression literal) { + throw new PredicateAnalyzerException("in cannot be applied to a compound expression"); + } + + @Override public QueryExpression notIn(LiteralExpression literal) { + throw new PredicateAnalyzerException("notIn cannot be applied to a compound expression"); + } + } + + /** + * Usually basic expression of type {@code a = 'val'} or {@code b > 42}. + */ + static class SimpleQueryExpression extends QueryExpression { + + private final NamedFieldExpression rel; + private QueryBuilder builder; + + private String getFieldReference() { + return rel.getReference(); + } + + private SimpleQueryExpression(NamedFieldExpression rel) { + this.rel = rel; + } + + @Override public QueryBuilder builder() { + if (builder == null) { + throw new IllegalStateException("Builder was not initialized"); + } + return builder; + } + + @Override public QueryExpression not() { + builder = boolQuery().mustNot(builder()); + return this; + } + + @Override public QueryExpression exists() { + builder = existsQuery(getFieldReference()); + return this; + } + + @Override public QueryExpression notExists() { + // Even though Lucene doesn't allow a stand alone mustNot boolean query, + // Elasticsearch handles this problem transparently on its end + builder = boolQuery().mustNot(existsQuery(getFieldReference())); + return this; + } + + @Override public QueryExpression like(LiteralExpression literal) { + builder = regexpQuery(getFieldReference(), literal.stringValue()); + return this; + } + + @Override public QueryExpression contains(LiteralExpression literal) { + builder = matchQuery(getFieldReference(), literal.value()); + return this; + } + + @Override public QueryExpression notLike(LiteralExpression literal) { + builder = boolQuery() + // NOT LIKE should return false when field is NULL + .must(existsQuery(getFieldReference())) + .mustNot(regexpQuery(getFieldReference(), literal.stringValue())); + return this; + } + + @Override public QueryExpression equals(LiteralExpression literal) { + Object value = literal.value(); + if (value instanceof GregorianCalendar) { + builder = boolQuery() + .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value))) + .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value))); + } else { + builder = termQuery(getFieldReference(), value); + } + return this; + } + + @Override public QueryExpression notEquals(LiteralExpression literal) { + Object value = literal.value(); + if (value instanceof GregorianCalendar) { + builder = boolQuery() + .should(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gt(value))) + .should(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lt(value))); + } else { + builder = boolQuery() + // NOT LIKE should return false when field is NULL + .must(existsQuery(getFieldReference())) + .mustNot(termQuery(getFieldReference(), value)); + } + return this; + } + + @Override public QueryExpression gt(LiteralExpression literal) { + Object value = literal.value(); + builder = + addFormatIfNecessary(literal, + rangeQuery(getFieldReference()).gt(value)); + return this; + } + + @Override public QueryExpression gte(LiteralExpression literal) { + Object value = literal.value(); + builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value)); + return this; + } + + @Override public QueryExpression lt(LiteralExpression literal) { + Object value = literal.value(); + builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lt(value)); + return this; + } + + @Override public QueryExpression lte(LiteralExpression literal) { + Object value = literal.value(); + builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value)); + return this; + } + + @Override public QueryExpression queryString(String query) { + throw new UnsupportedOperationException("QueryExpression not yet supported: " + query); + } + + @Override public QueryExpression isTrue() { + builder = termQuery(getFieldReference(), true); + return this; + } + + @Override public QueryExpression in(LiteralExpression literal) { + Iterable iterable = (Iterable) literal.value(); + builder = termsQuery(getFieldReference(), iterable); + return this; + } + + @Override public QueryExpression notIn(LiteralExpression literal) { + Iterable iterable = (Iterable) literal.value(); + builder = boolQuery().mustNot(termsQuery(getFieldReference(), iterable)); + return this; + } + } + + + /** + * By default, range queries on date/time need use the format of the source to parse the literal. + * So we need to specify that the literal has "date_time" format + * + * @param literal literal value + * @param rangeQueryBuilder query builder to optionally add {@code format} expression + * @return existing builder with possible {@code format} attribute + */ + private static RangeQueryBuilder addFormatIfNecessary(LiteralExpression literal, + RangeQueryBuilder rangeQueryBuilder) { + if (literal.value() instanceof GregorianCalendar) { + rangeQueryBuilder.format("date_time"); + } + return rangeQueryBuilder; + } + + /** + * Empty interface; exists only to define the type hierarchy. + */ + interface TerminalExpression extends Expression { + } + + /** + * SQL cast. For example, {@code cast(col as INTEGER)}. + */ + static final class CastExpression implements TerminalExpression { + @SuppressWarnings("unused") + private final RelDataType type; + private final TerminalExpression argument; + + private CastExpression(RelDataType type, TerminalExpression argument) { + this.type = type; + this.argument = argument; + } + + public boolean isCastFromLiteral() { + return argument instanceof LiteralExpression; + } + + static TerminalExpression unpack(TerminalExpression exp) { + if (!(exp instanceof CastExpression)) { + return exp; + } + return ((CastExpression) exp).argument; + } + + static boolean isCastExpression(Expression exp) { + return exp instanceof CastExpression; + } + + } + + /** + * Used for bind variables. + */ + static final class NamedFieldExpression implements TerminalExpression { + + private final String name; + + private NamedFieldExpression() { + this.name = null; + } + + private NamedFieldExpression(RexInputRef ref, List mapping) { + this.name = (ref == null || ref.getIndex() >= mapping.size()) ? null : mapping.get(ref.getIndex()); + } + + private NamedFieldExpression(RexLiteral literal) { + this.name = literal == null ? null : RexLiteral.stringValue(literal); + } + + String getRootName() { + return name; + } + + boolean isMetaField() { + return OpenSearchConstants.METADATAFIELD_TYPE_MAP.containsKey(getRootName()); + } + + String getReference() { + return getRootName(); + } + } + + /** + * Literal like {@code 'foo' or 42 or true} etc. + */ + static final class LiteralExpression implements TerminalExpression { + + final RexLiteral literal; + + LiteralExpression(RexLiteral literal) { + this.literal = literal; + } + + Object value() { + + if (isSarg()) { + return sargValue(); + } else if (isIntegral()) { + return longValue(); + } else if (isFloatingPoint()) { + return doubleValue(); + } else if (isBoolean()) { + return booleanValue(); + } else if (isString()) { + return RexLiteral.stringValue(literal); + } else { + return rawValue(); + } + } + + boolean isIntegral() { + return SqlTypeName.INT_TYPES.contains(literal.getType().getSqlTypeName()); + } + + boolean isFloatingPoint() { + return SqlTypeName.APPROX_TYPES.contains(literal.getType().getSqlTypeName()); + } + + boolean isBoolean() { + return SqlTypeName.BOOLEAN_TYPES.contains(literal.getType().getSqlTypeName()); + } + + public boolean isString() { + return SqlTypeName.CHAR_TYPES.contains(literal.getType().getSqlTypeName()); + } + + public boolean isSarg() { + return SqlTypeName.SARG.getName().equalsIgnoreCase(literal.getTypeName().getName()); + } + + long longValue() { + return ((Number) literal.getValue()).longValue(); + } + + double doubleValue() { + return ((Number) literal.getValue()).doubleValue(); + } + + boolean booleanValue() { + return RexLiteral.booleanValue(literal); + } + + String stringValue() { + return RexLiteral.stringValue(literal); + } + + List sargValue() { + final Sarg sarg = requireNonNull(literal.getValueAs(Sarg.class), "Sarg"); + final RelDataType type = literal.getType(); + List values = new ArrayList<>(); + final SqlTypeName sqlTypeName = type.getSqlTypeName(); + if (sarg.isPoints()) { + Set ranges = sarg.rangeSet.asRanges(); + ranges.forEach(range -> + values.add(sargPointValue(range.lowerEndpoint(), sqlTypeName))); + } else if (sarg.isComplementedPoints()) { + Set ranges = sarg.negate().rangeSet.asRanges(); + ranges.forEach(range -> + values.add(sargPointValue(range.lowerEndpoint(), sqlTypeName))); + } + return values; + } + + Object sargPointValue(Object point, SqlTypeName sqlTypeName) { + switch (sqlTypeName) { + case CHAR: + case VARCHAR: + return ((NlsString) point).getValue(); + default: + return point; + } + } + + Object rawValue() { + return literal.getValue(); + } + } + + /** + * If one operand in a binary operator is a DateTime type, but the other isn't, + * we should not push down the predicate. + * + * @param call Current node being evaluated + */ + private static void checkForIncompatibleDateTimeOperands(RexCall call) { + RelDataType op1 = call.getOperands().get(0).getType(); + RelDataType op2 = call.getOperands().get(1).getType(); + if ((SqlTypeFamily.DATETIME.contains(op1) && !SqlTypeFamily.DATETIME.contains(op2)) + || (SqlTypeFamily.DATETIME.contains(op2) && !SqlTypeFamily.DATETIME.contains(op1)) + || (SqlTypeFamily.DATE.contains(op1) && !SqlTypeFamily.DATE.contains(op2)) + || (SqlTypeFamily.DATE.contains(op2) && !SqlTypeFamily.DATE.contains(op1)) + || (SqlTypeFamily.TIMESTAMP.contains(op1) && !SqlTypeFamily.TIMESTAMP.contains(op2)) + || (SqlTypeFamily.TIMESTAMP.contains(op2) && !SqlTypeFamily.TIMESTAMP.contains(op1)) + || (SqlTypeFamily.TIME.contains(op1) && !SqlTypeFamily.TIME.contains(op2)) + || (SqlTypeFamily.TIME.contains(op2) && !SqlTypeFamily.TIME.contains(op1))) { + throw new PredicateAnalyzerException("Cannot handle " + call.getKind() + + " expression for _id field, " + call); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index f4085fac33..7e328beac8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -6,6 +6,7 @@ package org.opensearch.sql.opensearch.storage; import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -16,8 +17,10 @@ import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rex.RexNode; import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.sql.calcite.plan.OpenSearchTable; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprCoreType; @@ -30,6 +33,8 @@ import org.opensearch.sql.opensearch.planner.physical.MLOperator; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexEnumerator; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScan; @@ -76,6 +81,8 @@ public class OpenSearchIndex extends OpenSearchTable { /** The cached ExprType of fields. */ private Map cachedFieldTypes = null; + private List mapping = null; + /** The cached max result window setting of index. */ private Integer cachedMaxResultWindow = null; @@ -128,6 +135,7 @@ public Map getFieldTypes() { LinkedHashMap::new, (map, item) -> map.put(item.getKey(), item.getValue().getExprType()), Map::putAll); + mapping = cachedFieldTypes.keySet().stream().toList(); } return cachedFieldTypes; } @@ -204,6 +212,37 @@ public Enumerable scan(DataContext root) { }; } + @Override + public Enumerable scan(DataContext dataContext, List list, + int @Nullable [] ints) { + return new AbstractEnumerable() { + @Override + public Enumerator enumerator() { + final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); + + final TimeValue cursorKeepAlive = + settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); + var builder = + new OpenSearchRequestBuilder(querySizeLimit, createExprValueFactory(), settings); + if (!list.isEmpty()) { + try { + QueryBuilder filter = PredicateAnalyzer.analyze(list.getFirst(), mapping); + builder.pushDownFilter(filter); + } catch (ExpressionNotAnalyzableException e) { + throw new RuntimeException(e); + } + } + if (ints != null) { + builder.pushDownProjectStream(Arrays.stream(ints).mapToObj(i -> mapping.get(i))); + } + return new OpenSearchIndexEnumerator( + client, + builder.getMaxResponseSize(), + builder.build(indexName, getMaxResultWindow(), cursorKeepAlive, client)); + } + }; + } + @VisibleForTesting @RequiredArgsConstructor public static class OpenSearchDefaultImplementor extends DefaultImplementor { @@ -231,10 +270,10 @@ public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { } @Override - public Enumerable search() { - return new AbstractEnumerable() { + public Enumerable search() { + return new AbstractEnumerable() { @Override - public Enumerator enumerator() { + public Enumerator enumerator() { final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); final TimeValue cursorKeepAlive = diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 407c945fee..3886c38267 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -16,7 +16,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -public class OpenSearchIndexEnumerator implements Enumerator { +public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; @@ -58,7 +58,7 @@ private void fetchNextBatch() { } @Override - public Object current() { + public Object[] current() { Object[] p = fields.stream().map(k -> current.tupleValue().get(k).value()).toArray(); return p; } From 31c89c46f197a060de2f33605b76696cebc61e16 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 17 Feb 2025 18:17:50 +0800 Subject: [PATCH 2/4] Support Filter and Project pushdown v2 Signed-off-by: Heng Qian --- .../sql/calcite/CalcitePlanContext.java | 7 +- .../sql/calcite/plan/OpenSearchTable.java | 14 +- .../sql/calcite/plan/OpenSearchTableScan.java | 36 +- .../opensearch/sql/executor/QueryService.java | 39 +- opensearch/build.gradle | 3 + .../executor/OpenSearchExecutionEngine.java | 13 +- .../OpenSearchFilterIndexScanRule.java | 83 +++ .../physical/OpenSearchIndexRules.java | 23 + .../OpenSearchProjectIndexScanRule.java | 114 ++++ .../opensearch/request/PredicateAnalyzer.java | 636 +++++++++--------- .../opensearch/storage/OpenSearchIndex.java | 83 +-- .../scan/CalciteOpenSearchIndexScan.java | 148 ++++ .../scan/OpenSearchIndexEnumerator.java | 2 +- 13 files changed, 741 insertions(+), 460 deletions(-) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 9efbded7bc..e89d006eaa 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -7,7 +7,6 @@ import java.util.function.BiFunction; import lombok.Getter; -import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; @@ -16,15 +15,13 @@ public class CalcitePlanContext { public FrameworkConfig config; - public CalciteConnection connection; public final RelBuilder relBuilder; public final ExtendedRexBuilder rexBuilder; @Getter private boolean isResolvingJoinCondition = false; - public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) { + public CalcitePlanContext(FrameworkConfig config) { this.config = config; - this.connection = connection; this.relBuilder = RelBuilder.create(config); this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder()); } @@ -40,6 +37,6 @@ public RexNode resolveJoinCondition( // for testing only public static CalcitePlanContext create(FrameworkConfig config) { - return new CalcitePlanContext(config, null); + return new CalcitePlanContext(config); } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java index ac54ffa46e..92891ede84 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java @@ -11,19 +11,15 @@ import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.ProjectableFilterableTable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.TranslatableTable; import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes; public abstract class OpenSearchTable extends AbstractQueryableTable - implements ProjectableFilterableTable, TranslatableTable, org.opensearch.sql.storage.Table { + implements TranslatableTable, org.opensearch.sql.storage.Table { protected OpenSearchTable(Type elementType) { super(elementType); @@ -34,12 +30,6 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { return OpenSearchRelDataTypes.convertSchema(this); } - @Override - public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { - final RelOptCluster cluster = context.getCluster(); - return new OpenSearchTableScan(cluster, relOptTable, this); - } - @Override public Queryable asQueryable( QueryProvider queryProvider, SchemaPlus schema, String tableName) { @@ -56,5 +46,5 @@ public Expression getExpression(SchemaPlus schema, String tableName, Class clazz return Schemas.tableExpression(schema, getElementType(), tableName, clazz); } - public abstract Enumerable search(); + public abstract Enumerable search(); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTableScan.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTableScan.java index f1cd745b4c..03678713ef 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTableScan.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTableScan.java @@ -5,46 +5,26 @@ package org.opensearch.sql.calcite.plan; -import static java.util.Objects.requireNonNull; - import com.google.common.collect.ImmutableList; -import java.util.List; import org.apache.calcite.adapter.enumerable.EnumerableConvention; import org.apache.calcite.adapter.enumerable.EnumerableRel; -import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; -import org.apache.calcite.adapter.enumerable.PhysType; -import org.apache.calcite.adapter.enumerable.PhysTypeImpl; -import org.apache.calcite.linq4j.tree.Blocks; -import org.apache.calcite.linq4j.tree.Expressions; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.rules.CoreRules; /** Relational expression representing a scan of an OpenSearch type. */ -public class OpenSearchTableScan extends TableScan implements EnumerableRel { - private final OpenSearchTable osTable; - +public abstract class OpenSearchTableScan extends TableScan implements EnumerableRel { /** * Creates an OpenSearchTableScan. * * @param cluster Cluster * @param table Table - * @param osTable OpenSearch table */ - OpenSearchTableScan(RelOptCluster cluster, RelOptTable table, OpenSearchTable osTable) { + protected OpenSearchTableScan(RelOptCluster cluster, RelOptTable table) { super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), ImmutableList.of(), table); - this.osTable = requireNonNull(osTable, "OpenSearch table"); - } - - @Override - public RelNode copy(RelTraitSet traitSet, List inputs) { - assert inputs.isEmpty(); - return new OpenSearchTableScan(getCluster(), table, osTable); } @Override @@ -57,16 +37,4 @@ public void register(RelOptPlanner planner) { // it is converted to cardinality aggregation in OpenSearch planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES); } - - @Override - public Result implement(EnumerableRelImplementor implementor, Prefer pref) { - PhysType physType = - PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray()); - - return implementor.result( - physType, - Blocks.toBlock( - Expressions.call( - requireNonNull(table.getExpression(OpenSearchTable.class)), "search"))); - } } diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index b08ae2e567..cb51413ebe 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -13,10 +13,7 @@ import java.util.List; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; -import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.jdbc.CalciteJdbc41Factory; import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.jdbc.Driver; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; @@ -80,35 +77,14 @@ public void execute( AccessController.doPrivileged( (PrivilegedAction) () -> { - // Use simple calcite schema since we don't compute tables in advance of the - // query. - CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false); - CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); - CalciteConnection connection = - factory.newConnection( - new Driver(), - factory, - "", - new java.util.Properties(), - rootSchema, - null); - final SchemaPlus defaultSchema = - connection - .getRootSchema() - .add( - OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, - new OpenSearchSchema(dataSourceService)); - // Set opensearch schema as the default schema in config, otherwise we need to - // explicitly - // add schema path 'OpenSearch' before the opensearch table name - final FrameworkConfig config = buildFrameworkConfig(defaultSchema); - final CalcitePlanContext context = new CalcitePlanContext(config, connection); + final FrameworkConfig config = buildFrameworkConfig(); + final CalcitePlanContext context = new CalcitePlanContext(config); executePlanByCalcite(analyze(plan, context), context, listener); return null; }); } catch (Exception e) { LOG.warn("Fallback to V2 query engine since got exception", e); - executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + // executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);} } } } catch (Exception e) { @@ -174,10 +150,15 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) { return relNodeVisitor.analyze(plan, context); } - private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) { + private FrameworkConfig buildFrameworkConfig() { + // Use simple calcite schema since we don't compute tables in advance of the query. + final SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus(); + final SchemaPlus opensearchSchema = + rootSchema.add( + OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService)); return Frameworks.newConfigBuilder() .parserConfig(SqlParser.Config.DEFAULT) // TODO check - .defaultSchema(defaultSchema) + .defaultSchema(opensearchSchema) .traitDefs((List) null) .programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE)) .typeSystem(OpenSearchTypeSystem.INSTANCE) diff --git a/opensearch/build.gradle b/opensearch/build.gradle index f342c21164..62a774f11f 100644 --- a/opensearch/build.gradle +++ b/opensearch/build.gradle @@ -41,6 +41,9 @@ dependencies { compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}" implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" + annotationProcessor 'org.immutables:value:2.8.8' + compileOnly 'org.immutables:value-annotations:2.8.8' + testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3') testImplementation('org.junit.jupiter:junit-jupiter-params:5.9.3') testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.9.3') diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 9d5702ead9..43b6f3b9cf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -7,7 +7,6 @@ import java.security.AccessController; import java.security.PrivilegedAction; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -18,7 +17,7 @@ import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.tools.RelRunner; +import org.apache.calcite.tools.RelRunners; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.data.model.ExprTupleValue; @@ -111,13 +110,9 @@ public void execute( AccessController.doPrivileged( (PrivilegedAction) () -> { - Connection connection = context.connection; - try { - RelRunner relRunner = connection.unwrap(RelRunner.class); - try (PreparedStatement statement = relRunner.prepareStatement(rel)) { - ResultSet resultSet = statement.executeQuery(); - buildResultSet(resultSet, listener); - } + try (PreparedStatement statement = RelRunners.run(rel)) { + ResultSet result = statement.executeQuery(); + buildResultSet(result, listener); return null; } catch (SQLException e) { throw new RuntimeException(e); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java new file mode 100644 index 0000000000..6ca00acb16 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.opensearch.sql.opensearch.planner.physical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Project; +import org.immutables.value.Value; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; +import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan; + +/** Planner rule that push a {@link Project} down to {@link CalciteOpenSearchIndexScan} */ +@Value.Enclosing +public class OpenSearchFilterIndexScanRule extends RelRule { + + /** Creates a OpenSearchFilterIndexScanRule. */ + protected OpenSearchFilterIndexScanRule(Config config) { + super(config); + } + + // ~ Methods ---------------------------------------------------------------- + + protected static boolean test(CalciteOpenSearchIndexScan scan) { + final RelOptTable table = scan.getTable(); + return table.unwrap(OpenSearchIndex.class) != null; + } + + @Override + public void onMatch(RelOptRuleCall call) { + if (call.rels.length == 2) { + // the ordinary variant + final Filter filter = call.rel(0); + final CalciteOpenSearchIndexScan scan = call.rel(1); + apply(call, filter, scan); + } else { + throw new AssertionError(); + } + } + + protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) { + if (scan.pushDownFilter(filter)) { + call.transformTo(scan); + } + } + + /** Rule configuration. */ + @Value.Immutable + public interface Config extends RelRule.Config { + /** Config that matches Project on CalciteOpenSearchIndexScan. */ + Config DEFAULT = + ImmutableOpenSearchFilterIndexScanRule.Config.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(Filter.class) + .oneInput( + b1 -> + b1.operand(CalciteOpenSearchIndexScan.class) + .predicate(OpenSearchFilterIndexScanRule::test) + .noInputs())); + + @Override + default OpenSearchFilterIndexScanRule toRule() { + return new OpenSearchFilterIndexScanRule(this); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java new file mode 100644 index 0000000000..a1ee5b787d --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.calcite.plan.RelOptRule; + +public class OpenSearchIndexRules { + private static final OpenSearchProjectIndexScanRule PROJECT_INDEX_SCAN = + OpenSearchProjectIndexScanRule.Config.DEFAULT.toRule(); + private static final OpenSearchFilterIndexScanRule FILTER_INDEX_SCAN = + OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule(); + + public static final List OPEN_SEARCH_INDEX_SCAN_RULES = + ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN); + + // prevent instantiation + private OpenSearchIndexRules() {} +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java new file mode 100644 index 0000000000..ca847e5f82 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.opensearch.sql.opensearch.planner.physical; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.Mappings; +import org.immutables.value.Value; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; +import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan; + +/** Planner rule that push a {@link Project} down to {@link CalciteOpenSearchIndexScan} */ +@Value.Enclosing +public class OpenSearchProjectIndexScanRule extends RelRule { + + /** Creates a OpenSearchProjectIndexScanRule. */ + protected OpenSearchProjectIndexScanRule(Config config) { + super(config); + } + + // ~ Methods ---------------------------------------------------------------- + + protected static boolean test(CalciteOpenSearchIndexScan scan) { + final RelOptTable table = scan.getTable(); + return table.unwrap(OpenSearchIndex.class) != null; + } + + @Override + public void onMatch(RelOptRuleCall call) { + if (call.rels.length == 2) { + // the ordinary variant + final Project project = call.rel(0); + final CalciteOpenSearchIndexScan scan = call.rel(1); + apply(call, project, scan); + } else { + throw new AssertionError(); + } + } + + protected void apply(RelOptRuleCall call, Project project, CalciteOpenSearchIndexScan scan) { + final RelOptTable table = scan.getTable(); + requireNonNull(table.unwrap(OpenSearchIndex.class)); + + final List selectedColumns = new ArrayList<>(); + final RexVisitorImpl visitor = + new RexVisitorImpl(true) { + @Override + public Void visitInputRef(RexInputRef inputRef) { + if (!selectedColumns.contains(inputRef.getIndex())) { + selectedColumns.add(inputRef.getIndex()); + } + return null; + } + }; + visitor.visitEach(project.getProjects()); + + Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount()); + CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns); + final List newProjectRexNodes = RexUtil.apply(mapping, project.getProjects()); + + if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) { + call.transformTo(newScan); + } else { + call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build()); + } + } + + /** Rule configuration. */ + @Value.Immutable + public interface Config extends RelRule.Config { + /** Config that matches Project on OpenSearchProjectIndexScanRule. */ + Config DEFAULT = + ImmutableOpenSearchProjectIndexScanRule.Config.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(Project.class) + .oneInput( + b1 -> + b1.operand(CalciteOpenSearchIndexScan.class) + .predicate(OpenSearchProjectIndexScanRule::test) + .noInputs())); + + @Override + default OpenSearchProjectIndexScanRule toRule() { + return new OpenSearchProjectIndexScanRule(this); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 34dd969ff8..6d55f3a3eb 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -55,14 +55,12 @@ import org.opensearch.sql.calcite.plan.OpenSearchConstants; /** - * Query predicate analyzer. Uses visitor pattern to traverse existing expression - * and convert it to {@link QueryBuilder} + * Query predicate analyzer. Uses visitor pattern to traverse existing expression and convert it to + * {@link QueryBuilder} */ public class PredicateAnalyzer { - /** - * Internal exception. - */ + /** Internal exception. */ @SuppressWarnings("serial") private static final class PredicateAnalyzerException extends RuntimeException { @@ -76,8 +74,8 @@ private static final class PredicateAnalyzerException extends RuntimeException { } /** - * Exception that is thrown when a {@link org.apache.calcite.rel.RelNode} - * expression cannot be processed (or converted into an Elasticsearch query). + * Exception that is thrown when a {@link org.apache.calcite.rel.RelNode} expression cannot be + * processed (or converted into an Elasticsearch query). */ public static class ExpressionNotAnalyzableException extends Exception { ExpressionNotAnalyzableException(String message, Throwable cause) { @@ -88,23 +86,23 @@ public static class ExpressionNotAnalyzableException extends Exception { private PredicateAnalyzer() {} /** - * Walks the expression tree, attempting to convert the entire tree into - * an equivalent OpenSearch query filter. If an error occurs, or if it - * is determined that the expression cannot be converted, an exception is - * thrown and an error message logged. + * Walks the expression tree, attempting to convert the entire tree into an equivalent OpenSearch + * query filter. If an error occurs, or if it is determined that the expression cannot be + * converted, an exception is thrown and an error message logged. * - *

Callers should catch ExpressionNotAnalyzableException - * and fall back to not using push-down filters. + *

Callers should catch ExpressionNotAnalyzableException and fall back to not using push-down + * filters. * * @param expression expression to analyze * @return search query which can be used to query OS cluster * @throws ExpressionNotAnalyzableException when expression can't processed by this analyzer */ - public static QueryBuilder analyze(RexNode expression, List mapping) throws ExpressionNotAnalyzableException { + public static QueryBuilder analyze(RexNode expression, List schema) + throws ExpressionNotAnalyzableException { requireNonNull(expression, "expression"); try { // visits expression tree - QueryExpression e = (QueryExpression) expression.accept(new Visitor(mapping)); + QueryExpression e = (QueryExpression) expression.accept(new Visitor(schema)); if (e != null && e.isPartial()) { throw new UnsupportedOperationException("Can't handle partial QueryExpression: " + e); @@ -116,96 +114,94 @@ public static QueryBuilder analyze(RexNode expression, List mapping) thr } } - /** - * Traverses {@link RexNode} tree and builds ES query. - */ + /** Traverses {@link RexNode} tree and builds ES query. */ private static class Visitor extends RexVisitorImpl { - List mapping; - private Visitor(List mapping) { + List schema; + + private Visitor(List schema) { super(true); - this.mapping = mapping; + this.schema = schema; } - @Override public Expression visitInputRef(RexInputRef inputRef) { - return new NamedFieldExpression(inputRef, mapping); + @Override + public Expression visitInputRef(RexInputRef inputRef) { + return new NamedFieldExpression(inputRef, schema); } - @Override public Expression visitLiteral(RexLiteral literal) { + @Override + public Expression visitLiteral(RexLiteral literal) { return new LiteralExpression(literal); } private static boolean supportedRexCall(RexCall call) { final SqlSyntax syntax = call.getOperator().getSyntax(); switch (syntax) { - case BINARY: - switch (call.getKind()) { - case CONTAINS: - case AND: - case OR: - case LIKE: - case EQUALS: - case NOT_EQUALS: - case GREATER_THAN: - case GREATER_THAN_OR_EQUAL: - case LESS_THAN: - case LESS_THAN_OR_EQUAL: - return true; - default: - return false; - } - case SPECIAL: - switch (call.getKind()) { - case CAST: - case LIKE: - case ITEM: - case OTHER_FUNCTION: - return true; - case CASE: - case SIMILAR: - default: - return false; - } - case FUNCTION: - return true; - case POSTFIX: - switch (call.getKind()) { - case IS_NOT_NULL: - case IS_NULL: - return true; - default: - return false; - } - case PREFIX: // NOT() - switch (call.getKind()) { - case NOT: + case BINARY: + switch (call.getKind()) { + case CONTAINS: + case AND: + case OR: + case LIKE: + case EQUALS: + case NOT_EQUALS: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + return true; + default: + return false; + } + case SPECIAL: + switch (call.getKind()) { + case CAST: + case LIKE: + case ITEM: + case OTHER_FUNCTION: + return true; + case CASE: + case SIMILAR: + default: + return false; + } + case FUNCTION: return true; + case POSTFIX: + switch (call.getKind()) { + case IS_NOT_NULL: + case IS_NULL: + return true; + default: + return false; + } + case PREFIX: // NOT() + switch (call.getKind()) { + case NOT: + return true; + default: + return false; + } + case INTERNAL: + switch (call.getKind()) { + case SEARCH: + return canBeTranslatedToTermsQuery(call); + default: + return false; + } + case FUNCTION_ID: + case FUNCTION_STAR: default: return false; - } - case INTERNAL: - switch (call.getKind()) { - case SEARCH: - return canBeTranslatedToTermsQuery(call); - default: - return false; - } - case FUNCTION_ID: - case FUNCTION_STAR: - default: - return false; } } /** - * There are three types of the Sarg included in SEARCH RexCall: - * 1) Sarg is points (In ('a', 'b', 'c' ...)). - * In this case the search call can be translated to terms Query - * 2) Sarg is complementedPoints (Not in ('a', 'b')). - * In this case the search call can be translated to MustNot terms Query - * 3) Sarg is real Range( > 1 and <= 10). - * In this case the search call should be translated to rang Query - * Currently only the 1) and 2) cases are supported. + * There are three types of the Sarg included in SEARCH RexCall: 1) Sarg is points (In ('a', + * 'b', 'c' ...)). In this case the search call can be translated to terms Query 2) Sarg is + * complementedPoints (Not in ('a', 'b')). In this case the search call can be translated to + * MustNot terms Query 3) Sarg is real Range( > 1 and <= 10). In this case the search call + * should be translated to rang Query Currently only the 1) and 2) cases are supported. * * @param search SEARCH RexCall * @return true if it isSearchWithPoints or isSearchWithComplementedPoints, other false @@ -226,7 +222,8 @@ static boolean isSearchWithComplementedPoints(RexCall search) { return sarg.isComplementedPoints(); } - @Override public Expression visitCall(RexCall call) { + @Override + public Expression visitCall(RexCall call) { SqlSyntax syntax = call.getOperator().getSyntax(); if (!supportedRexCall(call)) { @@ -235,42 +232,40 @@ static boolean isSearchWithComplementedPoints(RexCall search) { } switch (syntax) { - case BINARY, INTERNAL: - return binary(call); - case POSTFIX: - return postfix(call); - case PREFIX: - return prefix(call); - case SPECIAL: - return switch (call.getKind()) { - case CAST -> toCastExpression(call); - case LIKE, CONTAINS -> binary(call); - default -> { - String message = String.format(Locale.ROOT, "Unsupported call: [%s]", call); - throw new PredicateAnalyzerException(message); + case BINARY, INTERNAL: + return binary(call); + case POSTFIX: + return postfix(call); + case PREFIX: + return prefix(call); + case SPECIAL: + return switch (call.getKind()) { + case CAST -> toCastExpression(call); + case LIKE, CONTAINS -> binary(call); + default -> { + String message = String.format(Locale.ROOT, "Unsupported call: [%s]", call); + throw new PredicateAnalyzerException(message); + } + }; + case FUNCTION: + if (call.getOperator().getName().equalsIgnoreCase("CONTAINS")) { + List operands = visitList(call.getOperands()); + String query = + convertQueryString( + operands.subList(0, operands.size() - 1), operands.get(operands.size() - 1)); + return QueryExpression.create(new NamedFieldExpression()).queryString(query); } - }; - case FUNCTION: - if (call.getOperator().getName().equalsIgnoreCase("CONTAINS")) { - List operands = visitList(call.getOperands()); - String query = - convertQueryString(operands.subList(0, operands.size() - 1), - operands.get(operands.size() - 1)); - return QueryExpression.create(new NamedFieldExpression()).queryString(query); - } - // fall through - default: - String message = - format(Locale.ROOT, - "Unsupported syntax [%s] for call: [%s]", syntax, call); - throw new PredicateAnalyzerException(message); + // fall through + default: + String message = + format(Locale.ROOT, "Unsupported syntax [%s] for call: [%s]", syntax, call); + throw new PredicateAnalyzerException(message); } } private static String convertQueryString(List fields, Expression query) { int index = 0; - checkArgument(query instanceof LiteralExpression, - "Query string must be a string literal"); + checkArgument(query instanceof LiteralExpression, "Query string must be a string literal"); String queryString = ((LiteralExpression) query).stringValue(); @SuppressWarnings("ModifiedButNotUsed") Map fieldMap = new LinkedHashMap<>(); @@ -289,8 +284,8 @@ private static String convertQueryString(List fields, Expression que } private QueryExpression prefix(RexCall call) { - checkArgument(call.getKind() == SqlKind.NOT, - "Expected %s got %s", SqlKind.NOT, call.getKind()); + checkArgument( + call.getKind() == SqlKind.NOT, "Expected %s got %s", SqlKind.NOT, call.getKind()); if (call.getOperands().size() != 1) { String message = String.format(Locale.ROOT, "Unsupported NOT operator: [%s]", call); @@ -302,8 +297,7 @@ private QueryExpression prefix(RexCall call) { } private QueryExpression postfix(RexCall call) { - checkArgument(call.getKind() == SqlKind.IS_NULL - || call.getKind() == SqlKind.IS_NOT_NULL); + checkArgument(call.getKind() == SqlKind.IS_NULL || call.getKind() == SqlKind.IS_NOT_NULL); if (call.getOperands().size() != 1) { String message = String.format(Locale.ROOT, "Unsupported operator: [%s]", call); throw new PredicateAnalyzerException(message); @@ -318,10 +312,10 @@ private QueryExpression postfix(RexCall call) { } /** - * Process a call which is a binary operation, transforming into an equivalent - * query expression. Note that the incoming call may be either a simple binary - * expression, such as {@code foo > 5}, or it may be several simple expressions connected - * by {@code AND} or {@code OR} operators, such as {@code foo > 5 AND bar = 'abc' AND 'rot' < 1} + * Process a call which is a binary operation, transforming into an equivalent query expression. + * Note that the incoming call may be either a simple binary expression, such as {@code foo > + * 5}, or it may be several simple expressions connected by {@code AND} or {@code OR} operators, + * such as {@code foo > 5 AND bar = 'abc' AND 'rot' < 1} * * @param call existing call * @return evaluated expression @@ -347,52 +341,52 @@ private QueryExpression binary(RexCall call) { || isColumn(pair.getKey(), call, OpenSearchConstants.METADATA_FIELD_INDEX, false) || isColumn(pair.getKey(), call, OpenSearchConstants.METADATA_FIELD_UID, false)) { switch (call.getKind()) { - case EQUALS: - case NOT_EQUALS: - break; - default: - throw new PredicateAnalyzerException( - "Cannot handle " + call.getKind() + " expression for _id field, " + call); + case EQUALS: + case NOT_EQUALS: + break; + default: + throw new PredicateAnalyzerException( + "Cannot handle " + call.getKind() + " expression for _id field, " + call); } } switch (call.getKind()) { - case CONTAINS: - return QueryExpression.create(pair.getKey()).contains(pair.getValue()); - case LIKE: - throw new UnsupportedOperationException("LIKE not yet supported"); - case EQUALS: - return QueryExpression.create(pair.getKey()).equals(pair.getValue()); - case NOT_EQUALS: - return QueryExpression.create(pair.getKey()).notEquals(pair.getValue()); - case GREATER_THAN: - if (swapped) { - return QueryExpression.create(pair.getKey()).lt(pair.getValue()); - } - return QueryExpression.create(pair.getKey()).gt(pair.getValue()); - case GREATER_THAN_OR_EQUAL: - if (swapped) { - return QueryExpression.create(pair.getKey()).lte(pair.getValue()); - } - return QueryExpression.create(pair.getKey()).gte(pair.getValue()); - case LESS_THAN: - if (swapped) { + case CONTAINS: + return QueryExpression.create(pair.getKey()).contains(pair.getValue()); + case LIKE: + throw new UnsupportedOperationException("LIKE not yet supported"); + case EQUALS: + return QueryExpression.create(pair.getKey()).equals(pair.getValue()); + case NOT_EQUALS: + return QueryExpression.create(pair.getKey()).notEquals(pair.getValue()); + case GREATER_THAN: + if (swapped) { + return QueryExpression.create(pair.getKey()).lt(pair.getValue()); + } return QueryExpression.create(pair.getKey()).gt(pair.getValue()); - } - return QueryExpression.create(pair.getKey()).lt(pair.getValue()); - case LESS_THAN_OR_EQUAL: - if (swapped) { + case GREATER_THAN_OR_EQUAL: + if (swapped) { + return QueryExpression.create(pair.getKey()).lte(pair.getValue()); + } return QueryExpression.create(pair.getKey()).gte(pair.getValue()); - } - return QueryExpression.create(pair.getKey()).lte(pair.getValue()); - case SEARCH: - if (isSearchWithComplementedPoints(call)) { - return QueryExpression.create(pair.getKey()).notIn(pair.getValue()); - } else { - return QueryExpression.create(pair.getKey()).in(pair.getValue()); - } - default: - break; + case LESS_THAN: + if (swapped) { + return QueryExpression.create(pair.getKey()).gt(pair.getValue()); + } + return QueryExpression.create(pair.getKey()).lt(pair.getValue()); + case LESS_THAN_OR_EQUAL: + if (swapped) { + return QueryExpression.create(pair.getKey()).gte(pair.getValue()); + } + return QueryExpression.create(pair.getKey()).lte(pair.getValue()); + case SEARCH: + if (isSearchWithComplementedPoints(call)) { + return QueryExpression.create(pair.getKey()).notIn(pair.getValue()); + } else { + return QueryExpression.create(pair.getKey()).in(pair.getValue()); + } + default: + break; } String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); throw new PredicateAnalyzerException(message); @@ -420,21 +414,22 @@ private QueryExpression andOr(RexCall call) { } switch (call.getKind()) { - case OR: - if (partial) { - if (firstError != null) { - throw firstError; - } else { - final String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); - throw new PredicateAnalyzerException(message); + case OR: + if (partial) { + if (firstError != null) { + throw firstError; + } else { + final String message = + String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + throw new PredicateAnalyzerException(message); + } } - } - return CompoundQueryExpression.or(expressions); - case AND: - return CompoundQueryExpression.and(partial, expressions); - default: - String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); - throw new PredicateAnalyzerException(message); + return CompoundQueryExpression.or(expressions); + case AND: + return CompoundQueryExpression.and(partial, expressions); + default: + String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + throw new PredicateAnalyzerException(message); } } @@ -469,10 +464,10 @@ boolean isSwapped() { /** * Swap order of operands such that the literal expression is always on the right. * - *

NOTE: Some combinations of operands are implicitly not supported and will - * cause an exception to be thrown. For example, we currently do not support - * comparing a literal to another literal as convention {@code 5 = 5}. Nor do we support - * comparing named fields to other named fields as convention {@code $0 = $1}. + *

NOTE: Some combinations of operands are implicitly not supported and will cause an + * exception to be thrown. For example, we currently do not support comparing a literal to + * another literal as convention {@code 5 = 5}. Nor do we support comparing named fields to + * other named fields as convention {@code $0 = $1}. * * @param left left expression * @param right right expression @@ -492,9 +487,11 @@ private static SwapResult swap(Expression left, Expression right) { if (literal == null || terminal == null) { String message = - String.format(Locale.ROOT, + String.format( + Locale.ROOT, "Unexpected combination of expressions [left: %s] [right: %s]", - left, right); + left, + right); throw new PredicateAnalyzerException(message); } @@ -514,9 +511,7 @@ private static NamedFieldExpression toNamedField(RexLiteral literal) { return new NamedFieldExpression(literal); } - /** - * Try to convert a generic expression into a literal expression. - */ + /** Try to convert a generic expression into a literal expression. */ private static LiteralExpression expressAsLiteral(Expression exp) { if (exp instanceof LiteralExpression) { @@ -526,8 +521,8 @@ private static LiteralExpression expressAsLiteral(Expression exp) { return null; } - private static boolean isColumn(Expression exp, RexNode node, - String columnName, boolean throwException) { + private static boolean isColumn( + Expression exp, RexNode node, String columnName, boolean throwException) { if (!(exp instanceof NamedFieldExpression)) { return false; } @@ -543,15 +538,10 @@ private static boolean isColumn(Expression exp, RexNode node, } } - /** - * Empty interface; exists only to define the type hierarchy. - */ - interface Expression { - } + /** Empty interface; exists only to define the type hierarchy. */ + interface Expression {} - /** - * Main expression operators (like {@code equals}, {@code gt}, {@code exists} etc.) - */ + /** Main expression operators (like {@code equals}, {@code gt}, {@code exists} etc.) */ abstract static class QueryExpression implements Expression { public abstract QueryBuilder builder(); @@ -562,9 +552,7 @@ public boolean isPartial() { public abstract QueryExpression contains(LiteralExpression literal); - /** - * Negate {@code this} QueryExpression (not the next one). - */ + /** Negate {@code this} QueryExpression (not the next one). */ public abstract QueryExpression not(); public abstract QueryExpression exists(); @@ -607,12 +595,9 @@ public static QueryExpression create(TerminalExpression expression) { throw new PredicateAnalyzerException(message); } } - } - /** - * Builds conjunctions / disjunctions based on existing expressions. - */ + /** Builds conjunctions / disjunctions based on existing expressions. */ static class CompoundQueryExpression extends QueryExpression { private final boolean partial; @@ -652,95 +637,110 @@ private CompoundQueryExpression(boolean partial, BoolQueryBuilder builder) { this.builder = requireNonNull(builder, "builder"); } - @Override public boolean isPartial() { + @Override + public boolean isPartial() { return partial; } - - @Override public QueryBuilder builder() { + @Override + public QueryBuilder builder() { return builder; } - @Override public QueryExpression not() { + @Override + public QueryExpression not() { return new CompoundQueryExpression(partial, boolQuery().mustNot(builder())); } - @Override public QueryExpression exists() { - throw new PredicateAnalyzerException("SqlOperatorImpl ['exists'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression exists() { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['exists'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression contains(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['contains'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression contains(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['contains'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression notExists() { - throw new PredicateAnalyzerException("SqlOperatorImpl ['notExists'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression notExists() { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['notExists'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression like(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['like'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression like(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['like'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression notLike(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['notLike'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression notLike(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['notLike'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression equals(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['='] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression equals(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['='] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression notEquals(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['not'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression notEquals(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['not'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression gt(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['>'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression gt(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['>'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression gte(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['>='] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression gte(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['>='] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression lt(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['<'] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression lt(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['<'] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression lte(LiteralExpression literal) { - throw new PredicateAnalyzerException("SqlOperatorImpl ['<='] " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression lte(LiteralExpression literal) { + throw new PredicateAnalyzerException( + "SqlOperatorImpl ['<='] " + "cannot be applied to a compound expression"); } - @Override public QueryExpression queryString(String query) { - throw new PredicateAnalyzerException("QueryString " - + "cannot be applied to a compound expression"); + @Override + public QueryExpression queryString(String query) { + throw new PredicateAnalyzerException( + "QueryString " + "cannot be applied to a compound expression"); } - @Override public QueryExpression isTrue() { + @Override + public QueryExpression isTrue() { throw new PredicateAnalyzerException("isTrue cannot be applied to a compound expression"); } - @Override public QueryExpression in(LiteralExpression literal) { + @Override + public QueryExpression in(LiteralExpression literal) { throw new PredicateAnalyzerException("in cannot be applied to a compound expression"); } - @Override public QueryExpression notIn(LiteralExpression literal) { + @Override + public QueryExpression notIn(LiteralExpression literal) { throw new PredicateAnalyzerException("notIn cannot be applied to a compound expression"); } } - /** - * Usually basic expression of type {@code a = 'val'} or {@code b > 42}. - */ + /** Usually basic expression of type {@code a = 'val'} or {@code b > 42}. */ static class SimpleQueryExpression extends QueryExpression { private final NamedFieldExpression rel; @@ -754,52 +754,62 @@ private SimpleQueryExpression(NamedFieldExpression rel) { this.rel = rel; } - @Override public QueryBuilder builder() { + @Override + public QueryBuilder builder() { if (builder == null) { throw new IllegalStateException("Builder was not initialized"); } return builder; } - @Override public QueryExpression not() { + @Override + public QueryExpression not() { builder = boolQuery().mustNot(builder()); return this; } - @Override public QueryExpression exists() { + @Override + public QueryExpression exists() { builder = existsQuery(getFieldReference()); return this; } - @Override public QueryExpression notExists() { + @Override + public QueryExpression notExists() { // Even though Lucene doesn't allow a stand alone mustNot boolean query, // Elasticsearch handles this problem transparently on its end builder = boolQuery().mustNot(existsQuery(getFieldReference())); return this; } - @Override public QueryExpression like(LiteralExpression literal) { + @Override + public QueryExpression like(LiteralExpression literal) { builder = regexpQuery(getFieldReference(), literal.stringValue()); return this; } - @Override public QueryExpression contains(LiteralExpression literal) { + @Override + public QueryExpression contains(LiteralExpression literal) { builder = matchQuery(getFieldReference(), literal.value()); return this; } - @Override public QueryExpression notLike(LiteralExpression literal) { - builder = boolQuery() + @Override + public QueryExpression notLike(LiteralExpression literal) { + builder = + boolQuery() // NOT LIKE should return false when field is NULL .must(existsQuery(getFieldReference())) .mustNot(regexpQuery(getFieldReference(), literal.stringValue())); return this; } - @Override public QueryExpression equals(LiteralExpression literal) { + @Override + public QueryExpression equals(LiteralExpression literal) { Object value = literal.value(); if (value instanceof GregorianCalendar) { - builder = boolQuery() + builder = + boolQuery() .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value))) .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value))); } else { @@ -808,14 +818,17 @@ private SimpleQueryExpression(NamedFieldExpression rel) { return this; } - @Override public QueryExpression notEquals(LiteralExpression literal) { + @Override + public QueryExpression notEquals(LiteralExpression literal) { Object value = literal.value(); if (value instanceof GregorianCalendar) { - builder = boolQuery() + builder = + boolQuery() .should(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gt(value))) .should(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lt(value))); } else { - builder = boolQuery() + builder = + boolQuery() // NOT LIKE should return false when field is NULL .must(existsQuery(getFieldReference())) .mustNot(termQuery(getFieldReference(), value)); @@ -823,55 +836,60 @@ private SimpleQueryExpression(NamedFieldExpression rel) { return this; } - @Override public QueryExpression gt(LiteralExpression literal) { + @Override + public QueryExpression gt(LiteralExpression literal) { Object value = literal.value(); - builder = - addFormatIfNecessary(literal, - rangeQuery(getFieldReference()).gt(value)); + builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gt(value)); return this; } - @Override public QueryExpression gte(LiteralExpression literal) { + @Override + public QueryExpression gte(LiteralExpression literal) { Object value = literal.value(); builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value)); return this; } - @Override public QueryExpression lt(LiteralExpression literal) { + @Override + public QueryExpression lt(LiteralExpression literal) { Object value = literal.value(); builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lt(value)); return this; } - @Override public QueryExpression lte(LiteralExpression literal) { + @Override + public QueryExpression lte(LiteralExpression literal) { Object value = literal.value(); builder = addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value)); return this; } - @Override public QueryExpression queryString(String query) { + @Override + public QueryExpression queryString(String query) { throw new UnsupportedOperationException("QueryExpression not yet supported: " + query); } - @Override public QueryExpression isTrue() { + @Override + public QueryExpression isTrue() { builder = termQuery(getFieldReference(), true); return this; } - @Override public QueryExpression in(LiteralExpression literal) { + @Override + public QueryExpression in(LiteralExpression literal) { Iterable iterable = (Iterable) literal.value(); builder = termsQuery(getFieldReference(), iterable); return this; } - @Override public QueryExpression notIn(LiteralExpression literal) { + @Override + public QueryExpression notIn(LiteralExpression literal) { Iterable iterable = (Iterable) literal.value(); builder = boolQuery().mustNot(termsQuery(getFieldReference(), iterable)); return this; } } - /** * By default, range queries on date/time need use the format of the source to parse the literal. * So we need to specify that the literal has "date_time" format @@ -880,26 +898,22 @@ private SimpleQueryExpression(NamedFieldExpression rel) { * @param rangeQueryBuilder query builder to optionally add {@code format} expression * @return existing builder with possible {@code format} attribute */ - private static RangeQueryBuilder addFormatIfNecessary(LiteralExpression literal, - RangeQueryBuilder rangeQueryBuilder) { + private static RangeQueryBuilder addFormatIfNecessary( + LiteralExpression literal, RangeQueryBuilder rangeQueryBuilder) { if (literal.value() instanceof GregorianCalendar) { rangeQueryBuilder.format("date_time"); } return rangeQueryBuilder; } - /** - * Empty interface; exists only to define the type hierarchy. - */ - interface TerminalExpression extends Expression { - } + /** Empty interface; exists only to define the type hierarchy. */ + interface TerminalExpression extends Expression {} - /** - * SQL cast. For example, {@code cast(col as INTEGER)}. - */ + /** SQL cast. For example, {@code cast(col as INTEGER)}. */ static final class CastExpression implements TerminalExpression { @SuppressWarnings("unused") private final RelDataType type; + private final TerminalExpression argument; private CastExpression(RelDataType type, TerminalExpression argument) { @@ -921,12 +935,9 @@ static TerminalExpression unpack(TerminalExpression exp) { static boolean isCastExpression(Expression exp) { return exp instanceof CastExpression; } - } - /** - * Used for bind variables. - */ + /** Used for bind variables. */ static final class NamedFieldExpression implements TerminalExpression { private final String name; @@ -935,8 +946,9 @@ private NamedFieldExpression() { this.name = null; } - private NamedFieldExpression(RexInputRef ref, List mapping) { - this.name = (ref == null || ref.getIndex() >= mapping.size()) ? null : mapping.get(ref.getIndex()); + private NamedFieldExpression(RexInputRef ref, List schema) { + this.name = + (ref == null || ref.getIndex() >= schema.size()) ? null : schema.get(ref.getIndex()); } private NamedFieldExpression(RexLiteral literal) { @@ -956,9 +968,7 @@ String getReference() { } } - /** - * Literal like {@code 'foo' or 42 or true} etc. - */ + /** Literal like {@code 'foo' or 42 or true} etc. */ static final class LiteralExpression implements TerminalExpression { final RexLiteral literal; @@ -1027,23 +1037,21 @@ List sargValue() { final SqlTypeName sqlTypeName = type.getSqlTypeName(); if (sarg.isPoints()) { Set ranges = sarg.rangeSet.asRanges(); - ranges.forEach(range -> - values.add(sargPointValue(range.lowerEndpoint(), sqlTypeName))); + ranges.forEach(range -> values.add(sargPointValue(range.lowerEndpoint(), sqlTypeName))); } else if (sarg.isComplementedPoints()) { Set ranges = sarg.negate().rangeSet.asRanges(); - ranges.forEach(range -> - values.add(sargPointValue(range.lowerEndpoint(), sqlTypeName))); + ranges.forEach(range -> values.add(sargPointValue(range.lowerEndpoint(), sqlTypeName))); } return values; } Object sargPointValue(Object point, SqlTypeName sqlTypeName) { switch (sqlTypeName) { - case CHAR: - case VARCHAR: - return ((NlsString) point).getValue(); - default: - return point; + case CHAR: + case VARCHAR: + return ((NlsString) point).getValue(); + default: + return point; } } @@ -1053,8 +1061,8 @@ Object rawValue() { } /** - * If one operand in a binary operator is a DateTime type, but the other isn't, - * we should not push down the predicate. + * If one operand in a binary operator is a DateTime type, but the other isn't, we should not push + * down the predicate. * * @param call Current node being evaluated */ @@ -1062,15 +1070,15 @@ private static void checkForIncompatibleDateTimeOperands(RexCall call) { RelDataType op1 = call.getOperands().get(0).getType(); RelDataType op2 = call.getOperands().get(1).getType(); if ((SqlTypeFamily.DATETIME.contains(op1) && !SqlTypeFamily.DATETIME.contains(op2)) - || (SqlTypeFamily.DATETIME.contains(op2) && !SqlTypeFamily.DATETIME.contains(op1)) - || (SqlTypeFamily.DATE.contains(op1) && !SqlTypeFamily.DATE.contains(op2)) - || (SqlTypeFamily.DATE.contains(op2) && !SqlTypeFamily.DATE.contains(op1)) - || (SqlTypeFamily.TIMESTAMP.contains(op1) && !SqlTypeFamily.TIMESTAMP.contains(op2)) - || (SqlTypeFamily.TIMESTAMP.contains(op2) && !SqlTypeFamily.TIMESTAMP.contains(op1)) - || (SqlTypeFamily.TIME.contains(op1) && !SqlTypeFamily.TIME.contains(op2)) - || (SqlTypeFamily.TIME.contains(op2) && !SqlTypeFamily.TIME.contains(op1))) { - throw new PredicateAnalyzerException("Cannot handle " + call.getKind() - + " expression for _id field, " + call); + || (SqlTypeFamily.DATETIME.contains(op2) && !SqlTypeFamily.DATETIME.contains(op1)) + || (SqlTypeFamily.DATE.contains(op1) && !SqlTypeFamily.DATE.contains(op2)) + || (SqlTypeFamily.DATE.contains(op2) && !SqlTypeFamily.DATE.contains(op1)) + || (SqlTypeFamily.TIMESTAMP.contains(op1) && !SqlTypeFamily.TIMESTAMP.contains(op2)) + || (SqlTypeFamily.TIMESTAMP.contains(op2) && !SqlTypeFamily.TIMESTAMP.contains(op1)) + || (SqlTypeFamily.TIME.contains(op1) && !SqlTypeFamily.TIME.contains(op2)) + || (SqlTypeFamily.TIME.contains(op2) && !SqlTypeFamily.TIME.contains(op1))) { + throw new PredicateAnalyzerException( + "Cannot handle " + call.getKind() + " expression for _id field, " + call); } } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 7e328beac8..b7b98116fe 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -6,21 +6,20 @@ package org.opensearch.sql.opensearch.storage; import com.google.common.annotations.VisibleForTesting; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Function; +import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.rex.RexNode; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.query.QueryBuilder; import org.opensearch.sql.calcite.plan.OpenSearchTable; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprCoreType; @@ -33,9 +32,8 @@ import org.opensearch.sql.opensearch.planner.physical.MLOperator; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; -import org.opensearch.sql.opensearch.request.PredicateAnalyzer; -import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; +import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexEnumerator; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScan; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScanBuilder; @@ -68,7 +66,7 @@ public class OpenSearchIndex extends OpenSearchTable { METADATA_FIELD_ROUTING, ExprCoreType.STRING); /** OpenSearch client connection. */ - private final OpenSearchClient client; + @Getter private final OpenSearchClient client; private final Settings settings; @@ -81,8 +79,6 @@ public class OpenSearchIndex extends OpenSearchTable { /** The cached ExprType of fields. */ private Map cachedFieldTypes = null; - private List mapping = null; - /** The cached max result window setting of index. */ private Integer cachedMaxResultWindow = null; @@ -94,6 +90,12 @@ public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexN this.indexName = new OpenSearchRequest.IndexName(indexName); } + @Override + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + final RelOptCluster cluster = context.getCluster(); + return new CalciteOpenSearchIndexScan(cluster, relOptTable, this); + } + @Override public boolean exists() { return client.exists(indexName.toString()); @@ -135,7 +137,6 @@ public Map getFieldTypes() { LinkedHashMap::new, (map, item) -> map.put(item.getKey(), item.getValue().getExprType()), Map::putAll); - mapping = cachedFieldTypes.keySet().stream().toList(); } return cachedFieldTypes; } @@ -201,48 +202,6 @@ public boolean isFieldTypeTolerance() { return settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE); } - // @Override - public Enumerable scan(DataContext root) { - return new AbstractEnumerable<@Nullable Object[]>() { - @Override - public Enumerator<@Nullable Object[]> enumerator() { - return null; - // return search().toMap(v -> new Object[] {v}); - } - }; - } - - @Override - public Enumerable scan(DataContext dataContext, List list, - int @Nullable [] ints) { - return new AbstractEnumerable() { - @Override - public Enumerator enumerator() { - final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); - - final TimeValue cursorKeepAlive = - settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); - var builder = - new OpenSearchRequestBuilder(querySizeLimit, createExprValueFactory(), settings); - if (!list.isEmpty()) { - try { - QueryBuilder filter = PredicateAnalyzer.analyze(list.getFirst(), mapping); - builder.pushDownFilter(filter); - } catch (ExpressionNotAnalyzableException e) { - throw new RuntimeException(e); - } - } - if (ints != null) { - builder.pushDownProjectStream(Arrays.stream(ints).mapToObj(i -> mapping.get(i))); - } - return new OpenSearchIndexEnumerator( - client, - builder.getMaxResponseSize(), - builder.build(indexName, getMaxResultWindow(), cursorKeepAlive, client)); - } - }; - } - @VisibleForTesting @RequiredArgsConstructor public static class OpenSearchDefaultImplementor extends DefaultImplementor { @@ -270,10 +229,10 @@ public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { } @Override - public Enumerable search() { - return new AbstractEnumerable() { + public Enumerable search() { + return new AbstractEnumerable<>() { @Override - public Enumerator enumerator() { + public Enumerator enumerator() { final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); final TimeValue cursorKeepAlive = @@ -288,4 +247,16 @@ public Enumerator enumerator() { } }; } + + public OpenSearchRequestBuilder createRequestBuilder() { + return new OpenSearchRequestBuilder( + settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), + this.createExprValueFactory(), + settings); + } + + public OpenSearchRequest buildRequest(OpenSearchRequestBuilder requestBuilder) { + final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); + return requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive, client); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java new file mode 100644 index 0000000000..276c245a50 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static java.util.Objects.requireNonNull; + +import java.util.List; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.sql.calcite.plan.OpenSearchTableScan; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; +import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; + +/** Relational expression representing a scan of an OpenSearchIndex type. */ +public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { + private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class); + + private final OpenSearchIndex osIndex; + private final OpenSearchRequestBuilder requestBuilder; + private final RelDataType schema; + + /** + * Creates an CalciteOpenSearchIndexScan. + * + * @param cluster Cluster + * @param table Table + * @param index OpenSearch index + */ + public CalciteOpenSearchIndexScan( + RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) { + this(cluster, table, index, index.createRequestBuilder(), table.getRowType()); + } + + public CalciteOpenSearchIndexScan( + RelOptCluster cluster, + RelOptTable table, + OpenSearchIndex index, + OpenSearchRequestBuilder requestBuilder, + RelDataType schema) { + super(cluster, table); + this.osIndex = requireNonNull(index, "OpenSearch index"); + this.requestBuilder = requestBuilder; + this.schema = schema; + } + + public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) { + // TODO: need to do deep-copy on requestBuilder in case non-idempotent push down. + return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + assert inputs.isEmpty(); + return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex); + } + + @Override + public void register(RelOptPlanner planner) { + super.register(planner); + for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) { + planner.addRule(rule); + } + } + + @Override + public RelDataType deriveRowType() { + return this.schema; + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + // Avoid optimizing the java row type since the scan will always return an array. + PhysType physType = + PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray(), false); + + Expression scanOperator = implementor.stash(this, CalciteOpenSearchIndexScan.class); + return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); + } + + public Enumerable<@Nullable Object> scan() { + return new AbstractEnumerable<>() { + @Override + public Enumerator enumerator() { + return new OpenSearchIndexEnumerator( + osIndex.getClient(), + List.copyOf(getRowType().getFieldNames()), + requestBuilder.getMaxResponseSize(), + osIndex.buildRequest(requestBuilder)); + } + }; + } + + public boolean pushDownFilter(Filter filter) { + try { + List schema = this.getRowType().getFieldNames(); + QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema); + requestBuilder.pushDownFilter(filterBuilder); + // TODO: handle the case where condition contains a score function + return true; + } catch (ExpressionNotAnalyzableException e) { + LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e); + } + return false; + } + + /** + * When pushing down a project, we need to create a new CalciteOpenSearchIndexScan with the + * updated schema since we cannot override getRowType() which is defined to be final. + */ + public CalciteOpenSearchIndexScan pushDownProject(List selectedColumns) { + final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); + final List fieldList = this.getRowType().getFieldList(); + for (int project : selectedColumns) { + builder.add(fieldList.get(project)); + } + RelDataType newSchema = builder.build(); + CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema); + newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()); + return newScan; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 3886c38267..161badffb6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -16,7 +16,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -public class OpenSearchIndexEnumerator implements Enumerator { +public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; From 8c4049507df28e47567c38e6bf758c585419c9fa Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Wed, 19 Feb 2025 15:36:09 +0800 Subject: [PATCH 3/4] Address comments Signed-off-by: Heng Qian --- .../opensearch/sql/executor/QueryService.java | 2 +- .../OpenSearchFilterIndexScanRule.java | 28 ++++--------- .../OpenSearchProjectIndexScanRule.java | 23 +++------- .../opensearch/request/PredicateAnalyzer.java | 42 +++++++++---------- .../scan/CalciteOpenSearchIndexScan.java | 3 +- 5 files changed, 37 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index cb51413ebe..456976f063 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -84,7 +84,7 @@ public void execute( }); } catch (Exception e) { LOG.warn("Fallback to V2 query engine since got exception", e); - // executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);} + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); } } } catch (Exception e) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java index 6ca00acb16..f8beb339fe 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java @@ -1,18 +1,6 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.opensearch.planner.physical; @@ -20,12 +8,11 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Project; import org.immutables.value.Value; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan; -/** Planner rule that push a {@link Project} down to {@link CalciteOpenSearchIndexScan} */ +/** Planner rule that push a {@link Filter} down to {@link CalciteOpenSearchIndexScan} */ @Value.Enclosing public class OpenSearchFilterIndexScanRule extends RelRule { @@ -34,8 +21,6 @@ protected OpenSearchFilterIndexScanRule(Config config) { super(config); } - // ~ Methods ---------------------------------------------------------------- - protected static boolean test(CalciteOpenSearchIndexScan scan) { final RelOptTable table = scan.getTable(); return table.unwrap(OpenSearchIndex.class) != null; @@ -49,7 +34,10 @@ public void onMatch(RelOptRuleCall call) { final CalciteOpenSearchIndexScan scan = call.rel(1); apply(call, filter, scan); } else { - throw new AssertionError(); + throw new AssertionError( + String.format( + "The length of rels should be %s but got %s", + this.operands.size(), call.rels.length)); } } @@ -62,7 +50,7 @@ protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexS /** Rule configuration. */ @Value.Immutable public interface Config extends RelRule.Config { - /** Config that matches Project on CalciteOpenSearchIndexScan. */ + /** Config that matches Filter on CalciteOpenSearchIndexScan. */ Config DEFAULT = ImmutableOpenSearchFilterIndexScanRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java index ca847e5f82..0e6c755fa4 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java @@ -1,18 +1,6 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.opensearch.planner.physical; @@ -43,8 +31,6 @@ protected OpenSearchProjectIndexScanRule(Config config) { super(config); } - // ~ Methods ---------------------------------------------------------------- - protected static boolean test(CalciteOpenSearchIndexScan scan) { final RelOptTable table = scan.getTable(); return table.unwrap(OpenSearchIndex.class) != null; @@ -58,7 +44,10 @@ public void onMatch(RelOptRuleCall call) { final CalciteOpenSearchIndexScan scan = call.rel(1); apply(call, project, scan); } else { - throw new AssertionError(); + throw new AssertionError( + String.format( + "The length of rels should be %s but got %s", + this.operands.size(), call.rels.length)); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 6d55f3a3eb..a755d1cf0f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -1,18 +1,6 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.sql.opensearch.request; @@ -57,12 +45,19 @@ /** * Query predicate analyzer. Uses visitor pattern to traverse existing expression and convert it to * {@link QueryBuilder} + * + *

Major part of this class have been copied from calcite ES adapter, but it has been changed to support the + * OpenSearch QueryBuilder. + * + *

And that file was also sourced from dremio ES adapter + * (thanks to their team for improving calcite-ES integration). */ public class PredicateAnalyzer { /** Internal exception. */ @SuppressWarnings("serial") - private static final class PredicateAnalyzerException extends RuntimeException { + public static final class PredicateAnalyzerException extends RuntimeException { PredicateAnalyzerException(String message) { super(message); @@ -75,7 +70,7 @@ private static final class PredicateAnalyzerException extends RuntimeException { /** * Exception that is thrown when a {@link org.apache.calcite.rel.RelNode} expression cannot be - * processed (or converted into an Elasticsearch query). + * processed (or converted into an OpenSearch query). */ public static class ExpressionNotAnalyzableException extends Exception { ExpressionNotAnalyzableException(String message, Throwable cause) { @@ -102,12 +97,13 @@ public static QueryBuilder analyze(RexNode expression, List schema) requireNonNull(expression, "expression"); try { // visits expression tree - QueryExpression e = (QueryExpression) expression.accept(new Visitor(schema)); + QueryExpression queryExpression = (QueryExpression) expression.accept(new Visitor(schema)); - if (e != null && e.isPartial()) { - throw new UnsupportedOperationException("Can't handle partial QueryExpression: " + e); + if (queryExpression != null && queryExpression.isPartial()) { + throw new UnsupportedOperationException( + "Can't handle partial QueryExpression: " + queryExpression); } - return e != null ? e.builder() : null; + return queryExpression != null ? queryExpression.builder() : null; } catch (Throwable e) { Throwables.throwIfInstanceOf(e, UnsupportedOperationException.class); throw new ExpressionNotAnalyzableException("Can't convert " + expression, e); @@ -303,7 +299,7 @@ private QueryExpression postfix(RexCall call) { throw new PredicateAnalyzerException(message); } Expression a = call.getOperands().get(0).accept(this); - // Elasticsearch does not want is null/is not null (exists query) + // OpenSearch does not want is null/is not null (exists query) // for _id and _index, although it supports for all other metadata column isColumn(a, call, OpenSearchConstants.METADATA_FIELD_ID, true); isColumn(a, call, OpenSearchConstants.METADATA_FIELD_INDEX, true); @@ -777,7 +773,7 @@ public QueryExpression exists() { @Override public QueryExpression notExists() { // Even though Lucene doesn't allow a stand alone mustNot boolean query, - // Elasticsearch handles this problem transparently on its end + // OpenSearch handles this problem transparently on its end builder = boolQuery().mustNot(existsQuery(getFieldReference())); return this; } @@ -813,6 +809,8 @@ public QueryExpression equals(LiteralExpression literal) { .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value))) .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value))); } else { + // TODO: equal(textFieldType, "value") should not rewrite as termQuery, + // it should be addressed by issue: https://github.com/opensearch-project/sql/issues/3334 builder = termQuery(getFieldReference(), value); } return this; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java index 276c245a50..20fb52c112 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java @@ -36,6 +36,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; /** Relational expression representing a scan of an OpenSearchIndex type. */ @@ -124,7 +125,7 @@ public boolean pushDownFilter(Filter filter) { requestBuilder.pushDownFilter(filterBuilder); // TODO: handle the case where condition contains a score function return true; - } catch (ExpressionNotAnalyzableException e) { + } catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) { LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e); } return false; From 558eed99cc1b6a23d9baed9aaaaa05459ff53808 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Fri, 21 Feb 2025 10:30:18 +0800 Subject: [PATCH 4/4] Add original license for PredicateAnalyzer Signed-off-by: Heng Qian --- .../opensearch/request/PredicateAnalyzer.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index a755d1cf0f..a9f35f74ba 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -2,6 +2,28 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ + +/* + * This file contains code from the Apache Spark project (original license below). + * It contains modifications, which are licensed as above: + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.opensearch.sql.opensearch.request; import static com.google.common.base.Preconditions.checkArgument;