Skip to content

Commit 3913f7d

Browse files
authored
chore: upgrade to datafusion 24 (#894)
1 parent 0c2dc8e commit 3913f7d

File tree

19 files changed

+257
-203
lines changed

19 files changed

+257
-203
lines changed

Cargo.lock

+123-111
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+10-6
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ name = "ceresdb-server"
5656
path = "src/bin/ceresdb-server.rs"
5757

5858
[workspace.dependencies]
59-
arrow = { version = "36.0.0", features = ["prettyprint"] }
60-
arrow_ipc = { version = "36.0.0" }
59+
arrow = { version = "38.0.0", features = ["prettyprint"] }
60+
arrow_ipc = { version = "38.0.0" }
6161
arrow_ext = { path = "components/arrow_ext" }
6262
analytic_engine = { path = "analytic_engine" }
6363
arena = { path = "components/arena" }
@@ -76,8 +76,8 @@ cluster = { path = "cluster" }
7676
criterion = "0.3"
7777
common_types = { path = "common_types" }
7878
common_util = { path = "common_util" }
79-
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" }
80-
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" }
79+
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" }
80+
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" }
8181
df_operator = { path = "df_operator" }
8282
etcd-client = "0.10.3"
8383
env_logger = "0.6"
@@ -89,13 +89,17 @@ lazy_static = "1.4.0"
8989
log = "0.4"
9090
logger = { path = "components/logger" }
9191
lru = "0.7.6"
92+
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", package = "iox_query_influxql" }
93+
influxql-parser = { git = "https://github.com/CeresDB/influxql", package = "influxdb_influxql_parser" }
94+
influxql-query = { git = "https://github.com/CeresDB/influxql", package = "iox_query" }
95+
influxql-schema = { git = "https://github.com/CeresDB/influxql", package = "schema" }
9296
interpreters = { path = "interpreters" }
9397
itertools = "0.10.5"
9498
meta_client = { path = "meta_client" }
9599
object_store = { path = "components/object_store" }
96100
partition_table_engine = { path = "partition_table_engine" }
97101
parquet_ext = { path = "components/parquet_ext" }
98-
parquet = { version = "36.0.0" }
102+
parquet = { version = "38.0.0" }
99103
paste = "1.0"
100104
pin-project-lite = "0.2.8"
101105
profile = { path = "components/profile" }
@@ -117,7 +121,7 @@ smallvec = "1.6"
117121
slog = "2.7"
118122
spin = "0.9.6"
119123
query_frontend = { path = "query_frontend" }
120-
sqlparser = { version = "0.32", features = ["serde"] }
124+
sqlparser = { version = "0.33", features = ["serde"] }
121125
system_catalog = { path = "system_catalog" }
122126
table_engine = { path = "table_engine" }
123127
table_kv = { path = "components/table_kv" }

analytic_engine/src/sst/parquet/encoding.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ impl HybridRecordDecoder {
526526
.iter()
527527
.map(|f| {
528528
if let DataType::List(nested_field) = f.data_type() {
529-
Field::new(f.name(), nested_field.data_type().clone(), true)
529+
Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true))
530530
} else {
531531
f.clone()
532532
}
@@ -554,9 +554,10 @@ impl HybridRecordDecoder {
554554
assert_eq!(array_ref.len() + 1, value_offsets.len());
555555

556556
let values_num = *value_offsets.last().unwrap() as usize;
557-
let offset_slices = array_ref.data().buffers()[0].as_slice();
558-
let value_slices = array_ref.data().buffers()[1].as_slice();
559-
let nulls = array_ref.data().nulls();
557+
let array_data = array_ref.to_data();
558+
let offset_slices = array_data.buffers()[0].as_slice();
559+
let value_slices = array_data.buffers()[1].as_slice();
560+
let nulls = array_data.nulls();
560561
trace!(
561562
"raw buffer slice, offsets:{:#02x?}, values:{:#02x?}",
562563
offset_slices,
@@ -630,8 +631,9 @@ impl HybridRecordDecoder {
630631
assert!(!value_offsets.is_empty());
631632

632633
let values_num = *value_offsets.last().unwrap() as usize;
633-
let old_values_buffer = array_ref.data().buffers()[0].as_slice();
634-
let old_nulls = array_ref.data().nulls();
634+
let array_data = array_ref.to_data();
635+
let old_values_buffer = array_data.buffers()[0].as_slice();
636+
let old_nulls = array_data.nulls();
635637

636638
let mut new_values_buffer = MutableBuffer::new(value_size * values_num);
637639
let mut new_null_buffer = hybrid::new_ones_buffer(values_num);
@@ -683,7 +685,8 @@ impl RecordDecoder for HybridRecordDecoder {
683685
let mut value_offsets = None;
684686
// Find value offsets from the first col in collapsible_cols_idx.
685687
if let Some(idx) = self.collapsible_cols_idx.first() {
686-
let offset_slices = arrays[*idx as usize].data().buffers()[0].as_slice();
688+
let array_data = arrays[*idx as usize].to_data();
689+
let offset_slices = array_data.buffers()[0].as_slice();
687690
value_offsets = Some(Self::get_array_offsets(offset_slices));
688691
} else {
689692
CollapsibleColsIdxEmpty.fail()?;
@@ -703,7 +706,7 @@ impl RecordDecoder for HybridRecordDecoder {
703706
// are collapsed by hybrid storage format, to differentiate
704707
// List column in original records
705708
DataType::List(_nested_field) => {
706-
Ok(make_array(array_ref.data().child_data()[0].clone()))
709+
Ok(make_array(array_ref.to_data().child_data()[0].clone()))
707710
}
708711
_ => {
709712
let datum_kind = DatumKind::from_data_type(data_type).unwrap();

analytic_engine/src/sst/parquet/hybrid.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ impl ArrayHandle {
7474
}
7575

7676
// Note: this require primitive array
77-
fn data_slice(&self) -> &[u8] {
78-
self.array.data().buffers()[0].as_slice()
77+
fn data_slice(&self) -> Vec<u8> {
78+
self.array.to_data().buffers()[0].as_slice().to_vec()
7979
}
8080

8181
fn nulls(&self) -> Option<&NullBuffer> {
82-
self.array.data().nulls()
82+
self.array.nulls()
8383
}
8484
}
8585

@@ -122,12 +122,12 @@ pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef {
122122
.enumerate()
123123
.map(|(idx, field)| {
124124
if schema.is_collapsible_column(idx) {
125-
let field_type = DataType::List(Box::new(Field::new(
125+
let field_type = DataType::List(Arc::new(Field::new(
126126
LIST_ITEM_NAME,
127127
field.data_type().clone(),
128128
true,
129129
)));
130-
Field::new(field.name(), field_type, true)
130+
Arc::new(Field::new(field.name(), field_type, true))
131131
} else {
132132
field.clone()
133133
}
@@ -418,7 +418,7 @@ impl ListArrayBuilder {
418418
let array_len = self.multi_row_arrays.len();
419419
let mut offsets = MutableBuffer::new(array_len * std::mem::size_of::<i32>());
420420
let child_data = self.build_child_data(&mut offsets)?;
421-
let field = Box::new(Field::new(
421+
let field = Arc::new(Field::new(
422422
LIST_ITEM_NAME,
423423
self.datum_kind.to_arrow_data_type(),
424424
true,
@@ -731,14 +731,14 @@ mod tests {
731731
let string_data =
732732
string_array(vec![Some("bb"), None, Some("ccc"), Some("eeee"), Some("a")]);
733733
let offsets: [i32; 3] = [0, 4, 5];
734-
let array_data = ArrayData::builder(DataType::List(Box::new(Field::new(
734+
let array_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
735735
LIST_ITEM_NAME,
736736
DataType::Utf8,
737737
true,
738738
))))
739739
.len(2)
740740
.add_buffer(Buffer::from_slice_ref(offsets))
741-
.add_child_data(string_data.data().to_owned())
741+
.add_child_data(string_data.to_data())
742742
.build()
743743
.unwrap();
744744
let expected = ListArray::from(array_data);

common_types/src/column.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ macro_rules! impl_from_array_and_slice {
346346
// the underlying vector of [arrow::buffer::Buffer] and Bitmap (also
347347
// holds a Buffer), thus require some allocation. However, the Buffer is
348348
// managed by Arc, so cloning the buffer is not too expensive.
349-
let array_data = array_ref.data().clone();
349+
let array_data = array_ref.into_data();
350350
let array = $ArrayType::from(array_data);
351351

352352
Self(array)
@@ -356,7 +356,7 @@ macro_rules! impl_from_array_and_slice {
356356
impl $Column {
357357
fn to_arrow_array(&self) -> $ArrayType {
358358
// Clone the array data.
359-
let array_data = self.0.data().clone();
359+
let array_data = self.0.clone().into_data();
360360
$ArrayType::from(array_data)
361361
}
362362

@@ -367,7 +367,7 @@ macro_rules! impl_from_array_and_slice {
367367
fn slice(&self, offset: usize, length: usize) -> Self {
368368
let array_slice = self.0.slice(offset, length);
369369
// Clone the slice data.
370-
let array_data = array_slice.data().clone();
370+
let array_data = array_slice.into_data();
371371
let array = $ArrayType::from(array_data);
372372

373373
Self(array)

common_types/src/column_schema.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
//! Schema of column
44
5-
use std::{collections::HashMap, convert::TryFrom, str::FromStr};
5+
use std::{collections::HashMap, convert::TryFrom, str::FromStr, sync::Arc};
66

77
use arrow::datatypes::{DataType, Field};
88
use ceresdbproto::schema as schema_pb;
@@ -280,10 +280,10 @@ impl TryFrom<schema_pb::ColumnSchema> for ColumnSchema {
280280
}
281281
}
282282

283-
impl TryFrom<&Field> for ColumnSchema {
283+
impl TryFrom<&Arc<Field>> for ColumnSchema {
284284
type Error = Error;
285285

286-
fn try_from(field: &Field) -> Result<Self> {
286+
fn try_from(field: &Arc<Field>) -> Result<Self> {
287287
let ArrowFieldMeta {
288288
id,
289289
is_tag,

common_types/src/datum.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,7 @@ pub mod arrow_convert {
977977
| DataType::LargeBinary
978978
| DataType::FixedSizeBinary(_)
979979
| DataType::Struct(_)
980-
| DataType::Union(_, _, _)
980+
| DataType::Union(_, _)
981981
| DataType::List(_)
982982
| DataType::LargeList(_)
983983
| DataType::FixedSizeList(_, _)

common_types/src/record_batch.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ fn cast_arrow_record_batch(source: ArrowRecordBatch) -> Result<ArrowRecordBatch>
325325
})
326326
.collect::<Vec<_>>();
327327
let mills_schema = Schema {
328-
fields: mills_fileds,
328+
fields: mills_fileds.into(),
329329
metadata: schema.metadata().clone(),
330330
};
331331
let result =

common_types/src/schema.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ impl RecordSchema {
434434
.columns
435435
.iter()
436436
.map(|col| col.to_arrow_field())
437-
.collect();
437+
.collect::<Vec<_>>();
438438
// Build arrow schema.
439439
let arrow_schema = Arc::new(ArrowSchema::new_with_metadata(
440440
fields,
@@ -1222,7 +1222,11 @@ impl Builder {
12221222
);
12231223
}
12241224

1225-
let fields = self.columns.iter().map(|c| c.to_arrow_field()).collect();
1225+
let fields = self
1226+
.columns
1227+
.iter()
1228+
.map(|c| c.to_arrow_field())
1229+
.collect::<Vec<_>>();
12261230
let meta = self.build_arrow_schema_meta();
12271231

12281232
Ok(Schema {

components/parquet_ext/src/prune/min_max.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ mod test {
210210
let fields = fields
211211
.into_iter()
212212
.map(|(name, data_type)| ArrowField::new(name, data_type, false))
213-
.collect();
213+
.collect::<Vec<_>>();
214214
Arc::new(ArrowSchema::new(fields))
215215
}
216216

integration_tests/build_meta.sh

+8-5
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
set -exo
44

5-
if [ -d ceresmeta ]; then
5+
SRC=/tmp/ceresmeta-src
6+
TARGET=$(pwd)/ceresmeta
7+
8+
if [ -d $SRC ]; then
69
echo "Remove old meta..."
7-
rm -r ceresmeta
10+
rm -rf $SRC
811
fi
912

10-
git clone --depth 1 https://github.com/ceresdb/ceresmeta.git
11-
cd ceresmeta
12-
go build -o ceresmeta ./cmd/meta/...
13+
git clone --depth 1 https://github.com/ceresdb/ceresmeta.git ${SRC}
14+
cd ${SRC}
15+
go build -o ${TARGET}/ceresmeta ./cmd/meta/...

integration_tests/cases/common/dummy/select_1.result

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Int64(1),
66

77
SELECT x;
88

9-
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SELECT x;. Caused by: Failed to create plan, err:Failed to generate datafusion plan, err:Schema error: No field named \"x\"." })
9+
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SELECT x;. Caused by: Failed to create plan, err:Failed to generate datafusion plan, err:Schema error: No field named x." })
1010

1111
SELECT 'a';
1212

@@ -22,7 +22,9 @@ Boolean(false),
2222

2323
SELECT NOT(1);
2424

25-
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: SELECT NOT(1);. Caused by: Failed to execute select, err:Failed to execute logical plan, err:Failed to collect record batch stream, err:Stream error, msg:convert from arrow record batch, err:Internal error: NOT 'Literal { value: Int64(1) }' can't be evaluated because the expression's type is Int64, not boolean or NULL. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" })
25+
NOT Int64(1),
26+
Int64(-2),
27+
2628

2729
SELECT TRUE;
2830

proxy/src/influxdb/types.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ pub(crate) fn convert_influxql_output(output: Output) -> Result<InfluxqlResponse
602602
mod tests {
603603
use std::sync::Arc;
604604

605-
use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema};
605+
use arrow::datatypes::{Field as ArrowField, Fields, Schema as ArrowSchema};
606606
use common_types::{
607607
column::{ColumnBlock, ColumnBlockBuilder},
608608
column_schema,
@@ -796,14 +796,14 @@ mod tests {
796796
false,
797797
);
798798
let project_fields = vec![
799-
measurement_field,
799+
Arc::new(measurement_field),
800800
fields[1].clone(),
801801
fields[0].clone(),
802802
fields[2].clone(),
803803
fields[3].clone(),
804804
];
805805
let project_arrow_schema = Arc::new(ArrowSchema::new_with_metadata(
806-
project_fields,
806+
Fields::from(project_fields),
807807
arrow_schema.metadata().clone(),
808808
));
809809

query_engine/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ common_util = { workspace = true }
2020
datafusion = { workspace = true }
2121
df_operator = { workspace = true }
2222
futures = { workspace = true }
23-
iox_query = { git = "https://github.com/CeresDB/influxql" }
23+
influxql-query = { workspace = true }
2424
log = { workspace = true }
2525
query_frontend = { workspace = true }
2626
serde = { workspace = true }

0 commit comments

Comments
 (0)