Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump datafusion version #1086

Merged
merged 5 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
786 changes: 572 additions & 214 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
alloc_tracker = { path = "components/alloc_tracker" }
arrow = { version = "38.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "38.0.0" }
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "43.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -91,8 +91,8 @@ clru = "0.6.1"
cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
datafusion = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion-proto = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "a6dcd943051a083693c352c6b4279156548490a0" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "a6dcd943051a083693c352c6b4279156548490a0" }
df_operator = { path = "df_operator" }
future_cancel = { path = "components/future_cancel" }
etcd-client = "0.10.3"
Expand All @@ -106,10 +106,10 @@ log = "0.4"
logger = { path = "components/logger" }
lru = "0.7.6"
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
macros = { path = "components/macros" }
Expand All @@ -121,7 +121,7 @@ panic_ext = { path = "components/panic_ext" }
partitioned_lock = { path = "components/partitioned_lock" }
partition_table_engine = { path = "partition_table_engine" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "38.0.0" }
parquet = { version = "43.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
profile = { path = "components/profile" }
Expand All @@ -145,7 +145,7 @@ size_ext = { path = "components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
sqlparser = { version = "0.33", features = ["serde"] }
sqlparser = { version = "0.35", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand All @@ -160,7 +160,7 @@ trace_metric = { path = "components/trace_metric" }
trace_metric_derive = { path = "components/trace_metric_derive" }
trace_metric_derive_tests = { path = "components/trace_metric_derive_tests" }
tonic = "0.8.1"
tokio = { version = "1.25", features = ["full"] }
tokio = { version = "1.29", features = ["full"] }
wal = { path = "wal" }
xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" }
zstd = { version = "0.12", default-features = false }
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl MetaData {
///
/// After the building, a new parquet meta data will be generated which
/// contains no extended custom information.
// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub fn try_new(
parquet_meta_data: &parquet_ext::ParquetMetaData,
ignore_sst_filter: bool,
Expand Down Expand Up @@ -147,6 +149,8 @@ mod tests {
use super::MetaData;
use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData};

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) {
assert_eq!(original.page_indexes(), processed.page_indexes());
assert_eq!(original.offset_indexes(), processed.offset_indexes());
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ use common_types::{
};
use datafusion::{
common::ToDFSchema,
datasource::physical_plan::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
physical_expr::{create_physical_expr, execution_props::ExecutionProps},
physical_plan::{
file_format::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use futures::{Stream, StreamExt};
use generic_error::{BoxError, GenericResult};
Expand Down Expand Up @@ -227,6 +225,8 @@ impl<'a> Reader<'a> {
.context(DataFusionError)
}

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
async fn fetch_record_batch_streams(
&mut self,
suggested_parallelism: usize,
Expand Down
8 changes: 3 additions & 5 deletions common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use arrow::{
error::ArrowError,
};
use bytes_ext::Bytes;
use datafusion::physical_plan::{
expressions::{cast_column, DEFAULT_DATAFUSION_CAST_OPTIONS},
ColumnarValue,
};
use datafusion::physical_plan::{expressions::cast_column, ColumnarValue};
use paste::paste;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};

Expand Down Expand Up @@ -858,7 +855,8 @@ pub fn cast_nanosecond_to_mills(array: &ArrayRef) -> Result<Arc<dyn Array>> {
let mills_column = cast_column(
&column,
&DataType::Timestamp(TimeUnit::Millisecond, None),
&DEFAULT_DATAFUSION_CAST_OPTIONS,
// It will use the default option internally when found None.
None,
)
.with_context(|| CastTimestamp {
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
Expand Down
14 changes: 12 additions & 2 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,12 @@ impl Datum {
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Fixedsizelist(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
}
Expand Down Expand Up @@ -1318,7 +1323,12 @@ impl<'a> DatumView<'a> {
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Fixedsizelist(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions components/parquet_ext/src/prune/equal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow::datatypes::SchemaRef;
use datafusion::{
common::Column,
logical_expr::{Expr, Operator},
logical_expr::{expr::InList, Expr, Operator},
scalar::ScalarValue,
};

Expand Down Expand Up @@ -173,11 +173,11 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr {
Operator::NotEq => normalize_equal_expr(left, right, false),
_ => unhandled,
},
Expr::InList {
Expr::InList(InList {
expr,
list,
negated,
} if list.len() < MAX_ELEMS_IN_LIST_FOR_FILTER => {
}) if list.len() < MAX_ELEMS_IN_LIST_FOR_FILTER => {
if list.is_empty() {
if *negated {
// "not in empty list" is always true
Expand Down Expand Up @@ -344,7 +344,7 @@ mod tests {
Expr::not_eq(make_literal_expr(0), make_column_expr("c1")),
Expr::eq(make_literal_expr(1), make_column_expr("c2")),
),
Expr::not(make_column_expr("c3")),
!make_column_expr("c3"),
),
);

Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {
fn test_normalize_unhandled() {
let lt_expr = Expr::gt(make_column_expr("c0"), make_literal_expr(0));
let empty_list_expr = Expr::in_list(make_column_expr("c0"), vec![], true);
let not_expr = Expr::not(make_column_expr("c0"));
let not_expr = !make_column_expr("c0");

let unhandled_exprs = vec![lt_expr, empty_list_expr, not_expr];
let expect_expr = NormalizedExpr::True;
Expand Down Expand Up @@ -415,7 +415,7 @@ mod tests {
Expr::not_eq(make_literal_expr(0), make_column_expr("c1")),
Expr::eq(make_literal_expr(1), make_column_expr("c2")),
),
Expr::not(make_column_expr("c3")),
!make_column_expr("c3"),
),
);
assert!(EqPruner::new(&true_expr).prune(&f));
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/reverse_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{collections::VecDeque, sync::Arc};

Expand Down Expand Up @@ -169,7 +169,7 @@ mod tests {
const TEST_BATCH_SIZE: usize = 1000;

fn check_reversed_row_iter(original: RowIter, reversed: ReversedFileReader) {
let mut original_reversed_rows: Vec<_> = original.into_iter().collect();
let mut original_reversed_rows: Vec<_> = original.into_iter().map(|v| v.unwrap()).collect();
original_reversed_rows.reverse();

let reversed_record_batches: Vec<_> = reversed
Expand Down
6 changes: 3 additions & 3 deletions df_operator/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common_types::{column::ColumnBlock, datum::DatumKind};
use datafusion::{
error::DataFusionError,
logical_expr::{
AccumulatorFunctionImplementation, ReturnTypeFunction, ScalarFunctionImplementation,
AccumulatorFactoryFunction, ReturnTypeFunction, ScalarFunctionImplementation,
Signature as DfSignature, StateTypeFunction, TypeSignature as DfTypeSignature, Volatility,
},
physical_plan::ColumnarValue as DfColumnarValue,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl ScalarFunction {
pub struct AggregateFunction {
type_signature: TypeSignature,
return_type: ReturnType,
df_accumulator: AccumulatorFunctionImplementation,
df_accumulator: AccumulatorFactoryFunction,
state_type: Vec<DatumKind>,
}

Expand Down Expand Up @@ -301,7 +301,7 @@ impl AggregateFunction {
}

#[inline]
pub(crate) fn to_datafusion_accumulator(&self) -> AccumulatorFunctionImplementation {
pub(crate) fn to_datafusion_accumulator(&self) -> AccumulatorFactoryFunction {
self.df_accumulator.clone()
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n Projection: issue59.id, issue59.account\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),


DROP TABLE IF EXISTS issue59;
Expand Down
6 changes: 0 additions & 6 deletions integration_tests/cases/common/dummy/select_1.result
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ NOT Int64(1) = Int64(1),
Boolean(false),


SELECT NOT(1);

NOT Int64(1),
Int64(-2),


SELECT TRUE;

Boolean(true),
Expand Down
4 changes: 0 additions & 4 deletions integration_tests/cases/common/dummy/select_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ SELECT 1;

SELECT x;

-- FIXME
SELECT 'a';

SELECT NOT(1=1);

-- FIXME
SELECT NOT(1);

SELECT TRUE;

SELECT FALSE;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n Projection: 07_optimizer_t.name, 07_optimizer_t.value\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),


DROP TABLE `07_optimizer_t`;
Expand Down
19 changes: 14 additions & 5 deletions query_engine/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Query context

Expand All @@ -8,10 +8,17 @@ use common_types::request_id::RequestId;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnv},
optimizer::{
analyzer::AnalyzerRule, common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit, optimizer::OptimizerRule,
push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection, simplify_expressions::SimplifyExpressions,
analyzer::{
count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan,
AnalyzerRule,
},
common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit,
optimizer::OptimizerRule,
push_down_filter::PushDownFilter,
push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection,
simplify_expressions::SimplifyExpressions,
single_distinct_to_groupby::SingleDistinctToGroupBy,
},
physical_optimizer::optimizer::PhysicalOptimizerRule,
Expand Down Expand Up @@ -108,8 +115,10 @@ impl Context {

fn analyzer_rules() -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeConversion),
Arc::new(datafusion::optimizer::analyzer::type_coercion::TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
]
}
}
Loading