Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump datafusion version #1086

Merged
merged 5 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
786 changes: 572 additions & 214 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
alloc_tracker = { path = "components/alloc_tracker" }
arrow = { version = "38.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "38.0.0" }
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "43.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -91,8 +91,8 @@ clru = "0.6.1"
cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
datafusion = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion-proto = { git = "https://github.com/ceresdb/arrow-datafusion.git", rev = "acb5d97a8a8de5296989740f97db3773fe3aa45a" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "a6dcd943051a083693c352c6b4279156548490a0" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "a6dcd943051a083693c352c6b4279156548490a0" }
df_operator = { path = "df_operator" }
future_cancel = { path = "components/future_cancel" }
etcd-client = "0.10.3"
Expand All @@ -106,10 +106,10 @@ log = "0.4"
logger = { path = "components/logger" }
lru = "0.7.6"
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "b65a125b9cdfa3121a3c8843bc48441b91049e31", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "b4520c6d5dfb6d68e4c7671555050391292dbffe", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
macros = { path = "components/macros" }
Expand All @@ -121,7 +121,7 @@ panic_ext = { path = "components/panic_ext" }
partitioned_lock = { path = "components/partitioned_lock" }
partition_table_engine = { path = "partition_table_engine" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "38.0.0" }
parquet = { version = "43.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
profile = { path = "components/profile" }
Expand All @@ -145,7 +145,7 @@ size_ext = { path = "components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
sqlparser = { version = "0.33", features = ["serde"] }
sqlparser = { version = "0.35", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand All @@ -160,7 +160,7 @@ trace_metric = { path = "components/trace_metric" }
trace_metric_derive = { path = "components/trace_metric_derive" }
trace_metric_derive_tests = { path = "components/trace_metric_derive_tests" }
tonic = "0.8.1"
tokio = { version = "1.25", features = ["full"] }
tokio = { version = "1.29", features = ["full"] }
wal = { path = "wal" }
xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" }
zstd = { version = "0.12", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/sst/meta_data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub mod cache;

use std::sync::Arc;
Expand Down
10 changes: 6 additions & 4 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use common_types::{
};
use datafusion::{
common::ToDFSchema,
datasource::physical_plan::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
// physical_plan::{
// file_format::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
// metrics::ExecutionPlanMetricsSet,
// },
physical_expr::{create_physical_expr, execution_props::ExecutionProps},
physical_plan::{
file_format::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use futures::{Stream, StreamExt};
use generic_error::{BoxError, GenericResult};
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/sst/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//! Sst implementation based on parquet.

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub mod async_reader;
pub mod encoding;
mod hybrid;
Expand Down
8 changes: 3 additions & 5 deletions common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use arrow::{
error::ArrowError,
};
use bytes_ext::Bytes;
use datafusion::physical_plan::{
expressions::{cast_column, DEFAULT_DATAFUSION_CAST_OPTIONS},
ColumnarValue,
};
use datafusion::physical_plan::{expressions::cast_column, ColumnarValue};
use paste::paste;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};

Expand Down Expand Up @@ -858,7 +855,8 @@ pub fn cast_nanosecond_to_mills(array: &ArrayRef) -> Result<Arc<dyn Array>> {
let mills_column = cast_column(
&column,
&DataType::Timestamp(TimeUnit::Millisecond, None),
&DEFAULT_DATAFUSION_CAST_OPTIONS,
// It will use the default option internally when found None.
None,
)
.with_context(|| CastTimestamp {
data_type: DataType::Timestamp(TimeUnit::Millisecond, None),
Expand Down
14 changes: 12 additions & 2 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,12 @@ impl Datum {
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Fixedsizelist(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
}
Expand Down Expand Up @@ -1318,7 +1323,12 @@ impl<'a> DatumView<'a> {
| ScalarValue::Struct(_, _)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_) => None,
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Fixedsizelist(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions components/parquet_ext/src/prune/equal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow::datatypes::SchemaRef;
use datafusion::{
common::Column,
logical_expr::{Expr, Operator},
logical_expr::{expr::InList, Expr, Operator},
scalar::ScalarValue,
};

Expand Down Expand Up @@ -173,11 +173,11 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr {
Operator::NotEq => normalize_equal_expr(left, right, false),
_ => unhandled,
},
Expr::InList {
Expr::InList(InList {
expr,
list,
negated,
} if list.len() < MAX_ELEMS_IN_LIST_FOR_FILTER => {
}) if list.len() < MAX_ELEMS_IN_LIST_FOR_FILTER => {
if list.is_empty() {
if *negated {
// "not in empty list" is always true
Expand Down Expand Up @@ -344,7 +344,7 @@ mod tests {
Expr::not_eq(make_literal_expr(0), make_column_expr("c1")),
Expr::eq(make_literal_expr(1), make_column_expr("c2")),
),
Expr::not(make_column_expr("c3")),
!make_column_expr("c3"),
),
);

Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {
fn test_normalize_unhandled() {
let lt_expr = Expr::gt(make_column_expr("c0"), make_literal_expr(0));
let empty_list_expr = Expr::in_list(make_column_expr("c0"), vec![], true);
let not_expr = Expr::not(make_column_expr("c0"));
let not_expr = !make_column_expr("c0");

let unhandled_exprs = vec![lt_expr, empty_list_expr, not_expr];
let expect_expr = NormalizedExpr::True;
Expand Down Expand Up @@ -415,7 +415,7 @@ mod tests {
Expr::not_eq(make_literal_expr(0), make_column_expr("c1")),
Expr::eq(make_literal_expr(1), make_column_expr("c2")),
),
Expr::not(make_column_expr("c3")),
!make_column_expr("c3"),
),
);
assert!(EqPruner::new(&true_expr).prune(&f));
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/reverse_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{collections::VecDeque, sync::Arc};

Expand Down Expand Up @@ -169,7 +169,7 @@ mod tests {
const TEST_BATCH_SIZE: usize = 1000;

fn check_reversed_row_iter(original: RowIter, reversed: ReversedFileReader) {
let mut original_reversed_rows: Vec<_> = original.into_iter().collect();
let mut original_reversed_rows: Vec<_> = original.into_iter().map(|v| v.unwrap()).collect();
original_reversed_rows.reverse();

let reversed_record_batches: Vec<_> = reversed
Expand Down
6 changes: 3 additions & 3 deletions df_operator/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common_types::{column::ColumnBlock, datum::DatumKind};
use datafusion::{
error::DataFusionError,
logical_expr::{
AccumulatorFunctionImplementation, ReturnTypeFunction, ScalarFunctionImplementation,
AccumulatorFactoryFunction, ReturnTypeFunction, ScalarFunctionImplementation,
Signature as DfSignature, StateTypeFunction, TypeSignature as DfTypeSignature, Volatility,
},
physical_plan::ColumnarValue as DfColumnarValue,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl ScalarFunction {
pub struct AggregateFunction {
type_signature: TypeSignature,
return_type: ReturnType,
df_accumulator: AccumulatorFunctionImplementation,
df_accumulator: AccumulatorFactoryFunction,
state_type: Vec<DatumKind>,
}

Expand Down Expand Up @@ -301,7 +301,7 @@ impl AggregateFunction {
}

#[inline]
pub(crate) fn to_datafusion_accumulator(&self) -> AccumulatorFunctionImplementation {
pub(crate) fn to_datafusion_accumulator(&self) -> AccumulatorFactoryFunction {
self.df_accumulator.clone()
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n Projection: issue59.id, issue59.account\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),


DROP TABLE IF EXISTS issue59;
Expand Down
4 changes: 1 addition & 3 deletions integration_tests/cases/common/dummy/select_1.result
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ Boolean(false),

SELECT NOT(1);

NOT Int64(1),
Int64(-2),

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: SELECT NOT(1);. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute select, err:Failed to execute logical plan, err:Failed to do logical optimization, err:DataFusion Failed to optimize logical plan, err:Optimizer rule 'simplify_expressions' failed\ncaused by\nInternal error: NOT 'Literal { value: Int64(1) }' can't be evaluated because the expression's type is Int64, not boolean or NULL. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker." })

SELECT TRUE;

Expand Down
3 changes: 1 addition & 2 deletions integration_tests/cases/common/dummy/select_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ SELECT 1;

SELECT x;

-- FIXME
SELECT 'a';

SELECT NOT(1=1);

-- FIXME
-- Revert to return error in https://github.com/apache/arrow-datafusion/pull/6599
SELECT NOT(1);

SELECT TRUE;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n Projection: 07_optimizer_t.name, 07_optimizer_t.value\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),


DROP TABLE `07_optimizer_t`;
Expand Down
19 changes: 14 additions & 5 deletions query_engine/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Query context

Expand All @@ -8,10 +8,17 @@ use common_types::request_id::RequestId;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnv},
optimizer::{
analyzer::AnalyzerRule, common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit, optimizer::OptimizerRule,
push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection, simplify_expressions::SimplifyExpressions,
analyzer::{
count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan,
AnalyzerRule,
},
common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit,
optimizer::OptimizerRule,
push_down_filter::PushDownFilter,
push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection,
simplify_expressions::SimplifyExpressions,
single_distinct_to_groupby::SingleDistinctToGroupBy,
},
physical_optimizer::optimizer::PhysicalOptimizerRule,
Expand Down Expand Up @@ -108,8 +115,10 @@ impl Context {

fn analyzer_rules() -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeConversion),
Arc::new(datafusion::optimizer::analyzer::type_coercion::TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
]
}
}
25 changes: 17 additions & 8 deletions query_engine/src/df_execution_extension/prom_align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use std::{
any::Any,
collections::{hash_map, BTreeMap, HashMap, VecDeque},
fmt, mem,
fmt,
hash::Hash,
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -24,7 +26,7 @@ use datafusion::{
execution::context::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
repartition::RepartitionExec, ColumnarValue, DisplayFormatType, ExecutionPlan,
repartition::RepartitionExec, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
Expand Down Expand Up @@ -62,7 +64,7 @@ define_result!(Error);
/// Refer to https://github.com/prometheus/prometheus/pull/1295
const PROMETHEUS_EXTRAPOLATION_THRESHOLD_COEFFICIENT: f64 = 1.1;

#[derive(Debug)]
#[derive(Debug, Hash)]
struct ExtractTsidExpr {}

impl fmt::Display for ExtractTsidExpr {
Expand Down Expand Up @@ -123,6 +125,11 @@ impl PhysicalExpr for ExtractTsidExpr {
) -> ArrowResult<Arc<dyn PhysicalExpr>> {
Ok(self)
}

fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) {
let mut s = state;
self.hash(&mut s);
}
}

/// Note: caller should ensure data[tail_index] is valid
Expand Down Expand Up @@ -234,6 +241,13 @@ impl ExecutionPlan for PromAlignExec {
}))
}

fn statistics(&self) -> Statistics {
// TODO(chenxiang)
Statistics::default()
}
}

impl DisplayAs for PromAlignExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -243,11 +257,6 @@ impl ExecutionPlan for PromAlignExec {
self.output_partitioning().partition_count(),
)
}

fn statistics(&self) -> Statistics {
// TODO(chenxiang)
Statistics::default()
}
}

struct PromAlignReader {
Expand Down
Loading