Skip to content

Commit

Permalink
Allow user defined SQL planners to be registered (apache#11208)
Browse files Browse the repository at this point in the history
* Allow user defined SQL planners to be registered

* fix clippy, remove unused Default

* format
  • Loading branch information
samuelcolvin authored and findepi committed Jul 16, 2024
1 parent 756e350 commit a66ccad
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 8 deletions.
10 changes: 10 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
expr_rewriter::FunctionRewrite,
logical_plan::{DdlStatement, Statement},
planner::UserDefinedSQLPlanner,
Expr, UserDefinedLogicalNode, WindowUDF,
};

Expand Down Expand Up @@ -1390,6 +1391,15 @@ impl FunctionRegistry for SessionContext {
) -> Result<()> {
self.state.write().register_function_rewrite(rewrite)
}

fn register_user_defined_sql_planner(
&mut self,
user_defined_sql_planner: Arc<dyn UserDefinedSQLPlanner>,
) -> Result<()> {
self.state
.write()
.register_user_defined_sql_planner(user_defined_sql_planner)
}
}

/// Create a new task context instance from SessionContext
Expand Down
24 changes: 21 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::planner::UserDefinedSQLPlanner;
use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::var_provider::{is_system_variables, VarType};
Expand Down Expand Up @@ -99,6 +100,8 @@ pub struct SessionState {
session_id: String,
/// Responsible for analyzing and rewrite a logical plan before optimization
analyzer: Analyzer,
/// Provides support for customising the SQL planner, e.g. to add support for custom operators like `->>` or `?`
user_defined_sql_planners: Vec<Arc<dyn UserDefinedSQLPlanner>>,
/// Responsible for optimizing a logical plan
optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
Expand Down Expand Up @@ -231,6 +234,7 @@ impl SessionState {
let mut new_self = SessionState {
session_id,
analyzer: Analyzer::new(),
user_defined_sql_planners: vec![],
optimizer: Optimizer::new(),
physical_optimizers: PhysicalOptimizer::new(),
query_planner: Arc::new(DefaultQueryPlanner {}),
Expand Down Expand Up @@ -947,16 +951,21 @@ impl SessionState {
where
S: ContextProvider,
{
let query = SqlToRel::new_with_options(provider, self.get_parser_options());
let mut query = SqlToRel::new_with_options(provider, self.get_parser_options());

// custom planners are registered first, so they're run first and take precedence over built-in planners
for planner in self.user_defined_sql_planners.iter() {
query = query.with_user_defined_planner(planner.clone());
}

// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
{
let array_planner =
Arc::new(functions_array::planner::ArrayFunctionPlanner::default()) as _;
Arc::new(functions_array::planner::ArrayFunctionPlanner) as _;

let field_access_planner =
Arc::new(functions_array::planner::FieldAccessPlanner::default()) as _;
Arc::new(functions_array::planner::FieldAccessPlanner) as _;

query
.with_user_defined_planner(array_planner)
Expand Down Expand Up @@ -1176,6 +1185,15 @@ impl FunctionRegistry for SessionState {
self.analyzer.add_function_rewrite(rewrite);
Ok(())
}

fn register_user_defined_sql_planner(
&mut self,
user_defined_sql_planner: Arc<dyn UserDefinedSQLPlanner>,
) -> datafusion_common::Result<()> {
self.user_defined_sql_planners
.push(user_defined_sql_planner);
Ok(())
}
}

impl OptimizerConfig for SessionState {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/tests/user_defined/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ mod user_defined_window_functions;

/// Tests for User Defined Table Functions
mod user_defined_table_functions;

/// Tests for User Defined SQL Planner
mod user_defined_sql_planner;
88 changes: 88 additions & 0 deletions datafusion/core/tests/user_defined/user_defined_sql_planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use std::sync::Arc;

use datafusion::common::{assert_batches_eq, DFSchema};
use datafusion::error::Result;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::Operator;
use datafusion::prelude::*;
use datafusion::sql::sqlparser::ast::BinaryOperator;
use datafusion_expr::planner::{PlannerResult, RawBinaryExpr, UserDefinedSQLPlanner};
use datafusion_expr::BinaryExpr;

struct MyCustomPlanner;

impl UserDefinedSQLPlanner for MyCustomPlanner {
fn plan_binary_op(
&self,
expr: RawBinaryExpr,
_schema: &DFSchema,
) -> Result<PlannerResult<RawBinaryExpr>> {
match &expr.op {
BinaryOperator::Arrow => {
Ok(PlannerResult::Planned(Expr::BinaryExpr(BinaryExpr {
left: Box::new(expr.left.clone()),
right: Box::new(expr.right.clone()),
op: Operator::StringConcat,
})))
}
BinaryOperator::LongArrow => {
Ok(PlannerResult::Planned(Expr::BinaryExpr(BinaryExpr {
left: Box::new(expr.left.clone()),
right: Box::new(expr.right.clone()),
op: Operator::Plus,
})))
}
_ => Ok(PlannerResult::Original(expr)),
}
}
}

async fn plan_and_collect(sql: &str) -> Result<Vec<RecordBatch>> {
let mut ctx = SessionContext::new();
ctx.register_user_defined_sql_planner(Arc::new(MyCustomPlanner))?;
ctx.sql(sql).await?.collect().await
}

#[tokio::test]
async fn test_custom_operators_arrow() {
let actual = plan_and_collect("select 'foo'->'bar';").await.unwrap();
let expected = [
"+----------------------------+",
"| Utf8(\"foo\") || Utf8(\"bar\") |",
"+----------------------------+",
"| foobar |",
"+----------------------------+",
];
assert_batches_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_custom_operators_long_arrow() {
let actual = plan_and_collect("select 1->>2;").await.unwrap();
let expected = [
"+---------------------+",
"| Int64(1) + Int64(2) |",
"+---------------------+",
"| 3 |",
"+---------------------+",
];
assert_batches_eq!(&expected, &actual);
}
2 changes: 1 addition & 1 deletion datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub trait ContextProvider {
}

/// This trait allows users to customize the behavior of the SQL planner
pub trait UserDefinedSQLPlanner {
pub trait UserDefinedSQLPlanner: Send + Sync {
/// Plan the binary operation between two expressions, returns OriginalBinaryExpr if not possible
fn plan_binary_op(
&self,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! FunctionRegistry trait
use crate::expr_rewriter::FunctionRewrite;
use crate::planner::UserDefinedSQLPlanner;
use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
use datafusion_common::{not_impl_err, plan_datafusion_err, Result};
use std::collections::HashMap;
Expand Down Expand Up @@ -108,6 +109,14 @@ pub trait FunctionRegistry {
) -> Result<()> {
not_impl_err!("Registering FunctionRewrite")
}

/// Registers a new [`UserDefinedSQLPlanner`] with the registry.
fn register_user_defined_sql_planner(
&mut self,
_user_defined_sql_planner: Arc<dyn UserDefinedSQLPlanner>,
) -> Result<()> {
not_impl_err!("Registering UserDefinedSQLPlanner")
}
}

/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
Expand Down
6 changes: 2 additions & 4 deletions datafusion/functions-array/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use crate::{
make_array::make_array,
};

#[derive(Default)]
pub struct ArrayFunctionPlanner {}
pub struct ArrayFunctionPlanner;

impl UserDefinedSQLPlanner for ArrayFunctionPlanner {
fn plan_binary_op(
Expand Down Expand Up @@ -99,8 +98,7 @@ impl UserDefinedSQLPlanner for ArrayFunctionPlanner {
}
}

#[derive(Default)]
pub struct FieldAccessPlanner {}
pub struct FieldAccessPlanner;

impl UserDefinedSQLPlanner for FieldAccessPlanner {
fn plan_field_access(
Expand Down

0 comments on commit a66ccad

Please sign in to comment.