Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Calcite Engine] Push down project and filter operator into index scan #3327

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
Expand All @@ -16,15 +15,13 @@
public class CalcitePlanContext {

public FrameworkConfig config;
public CalciteConnection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) {
public CalcitePlanContext(FrameworkConfig config) {
this.config = config;
this.connection = connection;
this.relBuilder = RelBuilder.create(config);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}
Expand All @@ -40,6 +37,6 @@ public RexNode resolveJoinCondition(

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config, null);
return new CalcitePlanContext(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
Expand All @@ -33,12 +30,6 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
return OpenSearchRelDataTypes.convertSchema(this);
}

@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
return new OpenSearchTableScan(cluster, relOptTable, this);
}

@Override
public <T> Queryable<T> asQueryable(
QueryProvider queryProvider, SchemaPlus schema, String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,26 @@

package org.opensearch.sql.calcite.plan;

import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.linq4j.tree.Blocks;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.rules.CoreRules;

/** Relational expression representing a scan of an OpenSearch type. */
public class OpenSearchTableScan extends TableScan implements EnumerableRel {
private final OpenSearchTable osTable;

public abstract class OpenSearchTableScan extends TableScan implements EnumerableRel {
/**
* Creates an OpenSearchTableScan.
*
* @param cluster Cluster
* @param table Table
* @param osTable OpenSearch table
*/
OpenSearchTableScan(RelOptCluster cluster, RelOptTable table, OpenSearchTable osTable) {
protected OpenSearchTableScan(RelOptCluster cluster, RelOptTable table) {
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), ImmutableList.of(), table);
this.osTable = requireNonNull(osTable, "OpenSearch table");
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
return new OpenSearchTableScan(getCluster(), table, osTable);
}

@Override
Expand All @@ -57,16 +37,4 @@ public void register(RelOptPlanner planner) {
// it is converted to cardinality aggregation in OpenSearch
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);
}

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());

return implementor.result(
physType,
Blocks.toBlock(
Expressions.call(
requireNonNull(table.getExpression(OpenSearchTable.class)), "search")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
Expand Down Expand Up @@ -80,29 +77,8 @@ public void execute(
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
// Use simple calcite schema since we don't compute tables in advance of the
// query.
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
CalciteConnection connection =
factory.newConnection(
new Driver(),
factory,
"",
new java.util.Properties(),
rootSchema,
null);
final SchemaPlus defaultSchema =
connection
.getRootSchema()
.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME,
new OpenSearchSchema(dataSourceService));
// Set opensearch schema as the default schema in config, otherwise we need to
// explicitly
// add schema path 'OpenSearch' before the opensearch table name
final FrameworkConfig config = buildFrameworkConfig(defaultSchema);
final CalcitePlanContext context = new CalcitePlanContext(config, connection);
final FrameworkConfig config = buildFrameworkConfig();
final CalcitePlanContext context = new CalcitePlanContext(config);
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
Expand Down Expand Up @@ -174,10 +150,15 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {
return relNodeVisitor.analyze(plan, context);
}

private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) {
private FrameworkConfig buildFrameworkConfig() {
// Use simple calcite schema since we don't compute tables in advance of the query.
final SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
final SchemaPlus opensearchSchema =
rootSchema.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService));
return Frameworks.newConfigBuilder()
.parserConfig(SqlParser.Config.DEFAULT) // TODO check
.defaultSchema(defaultSchema)
.defaultSchema(opensearchSchema)
.traitDefs((List<RelTraitDef>) null)
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
.typeSystem(OpenSearchTypeSystem.INSTANCE)
Expand Down
3 changes: 3 additions & 0 deletions opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ dependencies {
compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}"
implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

annotationProcessor 'org.immutables:value:2.8.8'
compileOnly 'org.immutables:value-annotations:2.8.8'

testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3')
testImplementation('org.junit.jupiter:junit-jupiter-params:5.9.3')
testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.9.3')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -18,7 +17,7 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.tools.RelRunners;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprTupleValue;
Expand Down Expand Up @@ -111,13 +110,9 @@ public void execute(
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
Connection connection = context.connection;
try {
RelRunner relRunner = connection.unwrap(RelRunner.class);
try (PreparedStatement statement = relRunner.prepareStatement(rel)) {
ResultSet resultSet = statement.executeQuery();
buildResultSet(resultSet, listener);
}
try (PreparedStatement statement = RelRunners.run(rel)) {
ResultSet result = statement.executeQuery();
buildResultSet(result, listener);
return null;
} catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.sql.opensearch.planner.physical;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Filter;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;

/** Planner rule that push a {@link Filter} down to {@link CalciteOpenSearchIndexScan} */
@Value.Enclosing
public class OpenSearchFilterIndexScanRule extends RelRule<OpenSearchFilterIndexScanRule.Config> {

/** Creates a OpenSearchFilterIndexScanRule. */
protected OpenSearchFilterIndexScanRule(Config config) {
super(config);
}

protected static boolean test(CalciteOpenSearchIndexScan scan) {
final RelOptTable table = scan.getTable();
return table.unwrap(OpenSearchIndex.class) != null;
}

@Override
public void onMatch(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final Filter filter = call.rel(0);
final CalciteOpenSearchIndexScan scan = call.rel(1);
apply(call, filter, scan);
} else {
throw new AssertionError(
String.format(
"The length of rels should be %s but got %s",
this.operands.size(), call.rels.length));
}
}

protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
if (scan.pushDownFilter(filter)) {
call.transformTo(scan);
}
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {
/** Config that matches Filter on CalciteOpenSearchIndexScan. */
Config DEFAULT =
ImmutableOpenSearchFilterIndexScanRule.Config.builder()
.build()
.withOperandSupplier(
b0 ->
b0.operand(Filter.class)
.oneInput(
b1 ->
b1.operand(CalciteOpenSearchIndexScan.class)
.predicate(OpenSearchFilterIndexScanRule::test)
.noInputs()));

@Override
default OpenSearchFilterIndexScanRule toRule() {
return new OpenSearchFilterIndexScanRule(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.planner.physical;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;

public class OpenSearchIndexRules {
private static final OpenSearchProjectIndexScanRule PROJECT_INDEX_SCAN =
OpenSearchProjectIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchFilterIndexScanRule FILTER_INDEX_SCAN =
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();

public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN);

// prevent instantiation
private OpenSearchIndexRules() {}
}
Loading
Loading