@@ -10,7 +10,7 @@ use std::{
10
10
11
11
use bytes:: Bytes ;
12
12
use ceresdbproto:: storage:: {
13
- storage_service_client:: StorageServiceClient , value, RouteRequest , WriteRequest ,
13
+ storage_service_client:: StorageServiceClient , value, RouteRequest , Value , WriteRequest ,
14
14
WriteResponse as WriteResponsePB , WriteSeriesEntry , WriteTableRequest ,
15
15
} ;
16
16
use cluster:: config:: SchemaConfig ;
@@ -34,7 +34,7 @@ use query_engine::executor::Executor as QueryExecutor;
34
34
use query_frontend:: {
35
35
frontend:: { Context as FrontendContext , Frontend } ,
36
36
plan:: { AlterTableOperation , AlterTablePlan , InsertPlan , Plan } ,
37
- planner:: build_schema_from_write_table_request ,
37
+ planner:: { build_column_schema , try_get_data_type_from_value } ,
38
38
provider:: CatalogMetaProvider ,
39
39
} ;
40
40
use router:: endpoint:: Endpoint ;
@@ -43,7 +43,7 @@ use table_engine::table::TableRef;
43
43
use tonic:: transport:: Channel ;
44
44
45
45
use crate :: {
46
- error:: { ErrNoCause , ErrWithCause , InternalNoCause , Result } ,
46
+ error:: { ErrNoCause , ErrWithCause , Internal , InternalNoCause , Result } ,
47
47
forward:: { ForwardResult , ForwarderRef } ,
48
48
Context , Proxy ,
49
49
} ;
@@ -477,14 +477,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
477
477
code : StatusCode :: BAD_REQUEST ,
478
478
} ) ?;
479
479
let schema = req_ctx. database ;
480
- let schema_config = self
481
- . schema_config_provider
482
- . schema_config ( & schema)
483
- . box_err ( )
484
- . with_context ( || ErrWithCause {
485
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
486
- msg : format ! ( "Fail to fetch schema config, schema_name:{schema}" ) ,
487
- } ) ?;
488
480
489
481
debug ! (
490
482
"Local write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}" ,
@@ -503,7 +495,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
503
495
} ;
504
496
505
497
let plan_vec = self
506
- . write_request_to_insert_plan ( req. table_requests , schema_config , write_context)
498
+ . write_request_to_insert_plan ( req. table_requests , write_context)
507
499
. await ?;
508
500
509
501
let mut success = 0 ;
@@ -522,7 +514,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
522
514
async fn write_request_to_insert_plan (
523
515
& self ,
524
516
table_requests : Vec < WriteTableRequest > ,
525
- schema_config : Option < & SchemaConfig > ,
526
517
write_context : WriteContext ,
527
518
) -> Result < Vec < InsertPlan > > {
528
519
let mut plan_vec = Vec :: with_capacity ( table_requests. len ( ) ) ;
@@ -534,7 +525,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
534
525
deadline,
535
526
auto_create_table,
536
527
} = write_context;
537
- let schema_config = schema_config. cloned ( ) . unwrap_or_default ( ) ;
538
528
for write_table_req in table_requests {
539
529
let table_name = & write_table_req. table ;
540
530
self . maybe_open_partition_table_if_not_exist ( & catalog, & schema, table_name)
@@ -555,7 +545,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
555
545
// * Currently, the decision to add columns is made at the request level, not at
556
546
// the row level, so the cost is relatively small.
557
547
let table_schema = table. schema ( ) ;
558
- let columns = find_new_columns ( & table_schema, & schema_config , & write_table_req) ?;
548
+ let columns = find_new_columns ( & table_schema, & write_table_req) ?;
559
549
if !columns. is_empty ( ) {
560
550
self . execute_add_columns_plan (
561
551
request_id,
@@ -668,32 +658,93 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
668
658
669
659
fn find_new_columns (
670
660
schema : & Schema ,
671
- schema_config : & SchemaConfig ,
672
- write_req : & WriteTableRequest ,
661
+ write_table_req : & WriteTableRequest ,
673
662
) -> Result < Vec < ColumnSchema > > {
674
- let new_schema = build_schema_from_write_table_request ( schema_config, write_req)
663
+ let WriteTableRequest {
664
+ table,
665
+ field_names,
666
+ tag_names,
667
+ entries : write_entries,
668
+ } = write_table_req;
669
+
670
+ let mut columns: HashMap < _ , ColumnSchema > = HashMap :: new ( ) ;
671
+ for write_entry in write_entries {
672
+ // Parse tags.
673
+ for tag in & write_entry. tags {
674
+ let name_index = tag. name_index as usize ;
675
+ ensure ! (
676
+ name_index < tag_names. len( ) ,
677
+ InternalNoCause {
678
+ msg: format!(
679
+ "Tag {tag:?} is not found in tag_names:{tag_names:?}, table:{table}" ,
680
+ ) ,
681
+ }
682
+ ) ;
683
+
684
+ let tag_name = & tag_names[ name_index] ;
685
+
686
+ build_column ( & mut columns, schema, tag_name, & tag. value , true ) ?;
687
+ }
688
+
689
+ // Parse fields.
690
+ for field_group in & write_entry. field_groups {
691
+ for field in & field_group. fields {
692
+ let field_index = field. name_index as usize ;
693
+ ensure ! (
694
+ field_index < field_names. len( ) ,
695
+ InternalNoCause {
696
+ msg: format!(
697
+ "Field {field:?} is not found in field_names:{field_names:?}, table:{table}" ,
698
+ ) ,
699
+ }
700
+ ) ;
701
+ let field_name = & field_names[ field. name_index as usize ] ;
702
+ build_column ( & mut columns, schema, field_name, & field. value , false ) ?;
703
+ }
704
+ }
705
+ }
706
+
707
+ Ok ( columns. into_iter ( ) . map ( |v| v. 1 ) . collect ( ) )
708
+ }
709
+
710
+ fn build_column < ' a > (
711
+ columns : & mut HashMap < & ' a str , ColumnSchema > ,
712
+ schema : & Schema ,
713
+ name : & ' a str ,
714
+ value : & Option < Value > ,
715
+ is_tag : bool ,
716
+ ) -> Result < ( ) > {
717
+ // Skip adding columns, the following cases:
718
+ // 1. Column already exists.
719
+ // 2. The new column has been added.
720
+ if schema. index_of ( name) . is_some ( ) || columns. get ( name) . is_some ( ) {
721
+ return Ok ( ( ) ) ;
722
+ }
723
+
724
+ let column_value = value
725
+ . as_ref ( )
726
+ . with_context ( || InternalNoCause {
727
+ msg : format ! ( "Column value is needed, column:{name}" ) ,
728
+ } ) ?
729
+ . value
730
+ . as_ref ( )
731
+ . with_context ( || InternalNoCause {
732
+ msg : format ! ( "Column value type is not supported, column:{name}" ) ,
733
+ } ) ?;
734
+
735
+ let data_type = try_get_data_type_from_value ( column_value)
675
736
. box_err ( )
676
- . context ( ErrWithCause {
677
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
678
- msg : "Build schema from write table request failed" ,
737
+ . context ( Internal {
738
+ msg : "Failed to get data type" ,
679
739
} ) ?;
680
740
681
- let columns = new_schema. columns ( ) ;
682
- let old_columns = schema. columns ( ) ;
683
-
684
- // find new columns:
685
- // 1. timestamp column can't be a new column;
686
- // 2. column not in old schema is a new column.
687
- let new_columns = columns
688
- . iter ( )
689
- . enumerate ( )
690
- . filter ( |( idx, column) | {
691
- * idx != new_schema. timestamp_index ( )
692
- && !old_columns. iter ( ) . any ( |c| c. name == column. name )
693
- } )
694
- . map ( |( _, column) | column. clone ( ) )
695
- . collect ( ) ;
696
- Ok ( new_columns)
741
+ let column_schema = build_column_schema ( name, data_type, is_tag)
742
+ . box_err ( )
743
+ . context ( Internal {
744
+ msg : "Failed to build column schema" ,
745
+ } ) ?;
746
+ columns. insert ( name, column_schema) ;
747
+ Ok ( ( ) )
697
748
}
698
749
699
750
fn write_table_request_to_insert_plan (
@@ -898,7 +949,7 @@ fn convert_proto_value_to_datum(
898
949
mod test {
899
950
use ceresdbproto:: storage:: { value, Field , FieldGroup , Tag , Value , WriteSeriesEntry } ;
900
951
use common_types:: {
901
- column_schema:: { self , ColumnSchema } ,
952
+ column_schema:: { self } ,
902
953
datum:: { Datum , DatumKind } ,
903
954
row:: Row ,
904
955
schema:: Builder ,
@@ -908,45 +959,136 @@ mod test {
908
959
909
960
use super :: * ;
910
961
911
- const TAG_K : & str = "tagk" ;
912
- const TAG_V : & str = "tagv" ;
913
- const TAG_K1 : & str = "tagk1" ;
914
- const TAG_V1 : & str = "tagv1" ;
915
- const FIELD_NAME : & str = "field" ;
916
- const FIELD_NAME1 : & str = "field1" ;
917
- const FIELD_VALUE_STRING : & str = "stringValue" ;
962
+ const NAME_COL1 : & str = "col1" ;
963
+ const NAME_NEW_COL1 : & str = "new_col1" ;
964
+ const NAME_COL2 : & str = "col2" ;
965
+ const NAME_COL3 : & str = "col3" ;
966
+ const NAME_COL4 : & str = "col4" ;
967
+ const NAME_COL5 : & str = "col5" ;
918
968
919
- // tag_names field_names write_entry
920
- fn generate_write_entry ( ) -> ( Schema , Vec < String > , Vec < String > , WriteSeriesEntry ) {
921
- let tag_names = vec ! [ TAG_K . to_string( ) , TAG_K1 . to_string( ) ] ;
922
- let field_names = vec ! [ FIELD_NAME . to_string( ) , FIELD_NAME1 . to_string( ) ] ;
969
+ #[ test]
970
+ fn test_write_entry_to_row_group ( ) {
971
+ let ( schema, tag_names, field_names, write_entry) = generate_write_entry ( ) ;
972
+ let rows =
973
+ write_entry_to_rows ( "test_table" , & schema, & tag_names, & field_names, write_entry)
974
+ . unwrap ( ) ;
975
+ let row0 = vec ! [
976
+ Datum :: Timestamp ( Timestamp :: new( 1000 ) ) ,
977
+ Datum :: String ( NAME_COL1 . into( ) ) ,
978
+ Datum :: String ( NAME_COL2 . into( ) ) ,
979
+ Datum :: Int64 ( 100 ) ,
980
+ Datum :: Null ,
981
+ ] ;
982
+ let row1 = vec ! [
983
+ Datum :: Timestamp ( Timestamp :: new( 2000 ) ) ,
984
+ Datum :: String ( NAME_COL1 . into( ) ) ,
985
+ Datum :: String ( NAME_COL2 . into( ) ) ,
986
+ Datum :: Null ,
987
+ Datum :: Int64 ( 10 ) ,
988
+ ] ;
989
+ let row2 = vec ! [
990
+ Datum :: Timestamp ( Timestamp :: new( 3000 ) ) ,
991
+ Datum :: String ( NAME_COL1 . into( ) ) ,
992
+ Datum :: String ( NAME_COL2 . into( ) ) ,
993
+ Datum :: Null ,
994
+ Datum :: Int64 ( 10 ) ,
995
+ ] ;
923
996
924
- let tag = Tag {
925
- name_index : 0 ,
926
- value : Some ( Value {
927
- value : Some ( value:: Value :: StringValue ( TAG_V . to_string ( ) ) ) ,
928
- } ) ,
929
- } ;
930
- let tag1 = Tag {
931
- name_index : 1 ,
997
+ let expect_rows = vec ! [
998
+ Row :: from_datums( row0) ,
999
+ Row :: from_datums( row1) ,
1000
+ Row :: from_datums( row2) ,
1001
+ ] ;
1002
+ assert_eq ! ( rows, expect_rows) ;
1003
+ }
1004
+
1005
+ #[ test]
1006
+ fn test_find_new_columns ( ) {
1007
+ let write_table_request = generate_write_table_request ( ) ;
1008
+ let schema = build_schema ( ) ;
1009
+ let new_columns = find_new_columns ( & schema, & write_table_request)
1010
+ . unwrap ( )
1011
+ . into_iter ( )
1012
+ . map ( |v| ( v. name . clone ( ) , v) )
1013
+ . collect :: < HashMap < _ , _ > > ( ) ;
1014
+
1015
+ assert_eq ! ( new_columns. len( ) , 2 ) ;
1016
+ assert ! ( new_columns. get( NAME_NEW_COL1 ) . is_some( ) ) ;
1017
+ assert ! ( new_columns. get( NAME_NEW_COL1 ) . unwrap( ) . is_tag) ;
1018
+ assert ! ( new_columns. get( NAME_COL5 ) . is_some( ) ) ;
1019
+ assert ! ( !new_columns. get( NAME_COL5 ) . unwrap( ) . is_tag) ;
1020
+ }
1021
+
1022
+ fn build_schema ( ) -> Schema {
1023
+ Builder :: new ( )
1024
+ . auto_increment_column_id ( true )
1025
+ . add_key_column (
1026
+ column_schema:: Builder :: new (
1027
+ TIMESTAMP_COLUMN_NAME . to_string ( ) ,
1028
+ DatumKind :: Timestamp ,
1029
+ )
1030
+ . build ( )
1031
+ . unwrap ( ) ,
1032
+ )
1033
+ . unwrap ( )
1034
+ . add_key_column (
1035
+ column_schema:: Builder :: new ( NAME_COL1 . to_string ( ) , DatumKind :: String )
1036
+ . is_tag ( true )
1037
+ . build ( )
1038
+ . unwrap ( ) ,
1039
+ )
1040
+ . unwrap ( )
1041
+ . add_key_column (
1042
+ column_schema:: Builder :: new ( NAME_COL2 . to_string ( ) , DatumKind :: String )
1043
+ . is_tag ( true )
1044
+ . build ( )
1045
+ . unwrap ( ) ,
1046
+ )
1047
+ . unwrap ( )
1048
+ . add_normal_column (
1049
+ column_schema:: Builder :: new ( NAME_COL3 . to_string ( ) , DatumKind :: Int64 )
1050
+ . build ( )
1051
+ . unwrap ( ) ,
1052
+ )
1053
+ . unwrap ( )
1054
+ . add_normal_column (
1055
+ column_schema:: Builder :: new ( NAME_COL4 . to_string ( ) , DatumKind :: Int64 )
1056
+ . build ( )
1057
+ . unwrap ( ) ,
1058
+ )
1059
+ . unwrap ( )
1060
+ . build ( )
1061
+ . unwrap ( )
1062
+ }
1063
+
1064
+ fn make_tag ( name_index : u32 , val : & str ) -> Tag {
1065
+ Tag {
1066
+ name_index,
932
1067
value : Some ( Value {
933
- value : Some ( value:: Value :: StringValue ( TAG_V1 . to_string ( ) ) ) ,
1068
+ value : Some ( value:: Value :: StringValue ( val . to_string ( ) ) ) ,
934
1069
} ) ,
935
- } ;
1070
+ }
1071
+ }
1072
+
1073
+ fn make_field ( name_index : u32 , val : value:: Value ) -> Field {
1074
+ Field {
1075
+ name_index,
1076
+ value : Some ( Value { value : Some ( val) } ) ,
1077
+ }
1078
+ }
1079
+
1080
+ // tag_names field_names write_entry
1081
+ fn generate_write_entry ( ) -> ( Schema , Vec < String > , Vec < String > , WriteSeriesEntry ) {
1082
+ let tag_names = vec ! [ NAME_COL1 . to_string( ) , NAME_COL2 . to_string( ) ] ;
1083
+ let field_names = vec ! [ NAME_COL3 . to_string( ) , NAME_COL4 . to_string( ) ] ;
1084
+
1085
+ let tag = make_tag ( 0 , NAME_COL1 ) ;
1086
+ let tag1 = make_tag ( 1 , NAME_COL2 ) ;
936
1087
let tags = vec ! [ tag, tag1] ;
937
1088
938
- let field = Field {
939
- name_index : 0 ,
940
- value : Some ( Value {
941
- value : Some ( value:: Value :: Float64Value ( 100.0 ) ) ,
942
- } ) ,
943
- } ;
944
- let field1 = Field {
945
- name_index : 1 ,
946
- value : Some ( Value {
947
- value : Some ( value:: Value :: StringValue ( FIELD_VALUE_STRING . to_string ( ) ) ) ,
948
- } ) ,
949
- } ;
1089
+ let field = make_field ( 0 , value:: Value :: Int64Value ( 100 ) ) ;
1090
+ let field1 = make_field ( 1 , value:: Value :: Int64Value ( 10 ) ) ;
1091
+
950
1092
let field_group = FieldGroup {
951
1093
timestamp : 1000 ,
952
1094
fields : vec ! [ field] ,
@@ -965,102 +1107,44 @@ mod test {
965
1107
field_groups : vec ! [ field_group, field_group1, field_group2] ,
966
1108
} ;
967
1109
968
- let schema_builder = Builder :: new ( ) ;
969
- let schema = schema_builder
970
- . auto_increment_column_id ( true )
971
- . add_key_column ( ColumnSchema {
972
- id : column_schema:: COLUMN_ID_UNINIT ,
973
- name : TIMESTAMP_COLUMN_NAME . to_string ( ) ,
974
- data_type : DatumKind :: Timestamp ,
975
- is_nullable : false ,
976
- is_tag : false ,
977
- comment : String :: new ( ) ,
978
- escaped_name : TIMESTAMP_COLUMN_NAME . escape_debug ( ) . to_string ( ) ,
979
- default_value : None ,
980
- } )
981
- . unwrap ( )
982
- . add_key_column ( ColumnSchema {
983
- id : column_schema:: COLUMN_ID_UNINIT ,
984
- name : TAG_K . to_string ( ) ,
985
- data_type : DatumKind :: String ,
986
- is_nullable : false ,
987
- is_tag : true ,
988
- comment : String :: new ( ) ,
989
- escaped_name : TAG_K . escape_debug ( ) . to_string ( ) ,
990
- default_value : None ,
991
- } )
992
- . unwrap ( )
993
- . add_normal_column ( ColumnSchema {
994
- id : column_schema:: COLUMN_ID_UNINIT ,
995
- name : TAG_K1 . to_string ( ) ,
996
- data_type : DatumKind :: String ,
997
- is_nullable : false ,
998
- is_tag : true ,
999
- comment : String :: new ( ) ,
1000
- escaped_name : TAG_K1 . escape_debug ( ) . to_string ( ) ,
1001
- default_value : None ,
1002
- } )
1003
- . unwrap ( )
1004
- . add_normal_column ( ColumnSchema {
1005
- id : column_schema:: COLUMN_ID_UNINIT ,
1006
- name : FIELD_NAME . to_string ( ) ,
1007
- data_type : DatumKind :: Double ,
1008
- is_nullable : true ,
1009
- is_tag : false ,
1010
- comment : String :: new ( ) ,
1011
- escaped_name : FIELD_NAME . escape_debug ( ) . to_string ( ) ,
1012
- default_value : None ,
1013
- } )
1014
- . unwrap ( )
1015
- . add_normal_column ( ColumnSchema {
1016
- id : column_schema:: COLUMN_ID_UNINIT ,
1017
- name : FIELD_NAME1 . to_string ( ) ,
1018
- data_type : DatumKind :: String ,
1019
- is_nullable : true ,
1020
- is_tag : false ,
1021
- comment : String :: new ( ) ,
1022
- escaped_name : FIELD_NAME1 . escape_debug ( ) . to_string ( ) ,
1023
- default_value : None ,
1024
- } )
1025
- . unwrap ( )
1026
- . build ( )
1027
- . unwrap ( ) ;
1110
+ let schema = build_schema ( ) ;
1028
1111
( schema, tag_names, field_names, write_entry)
1029
1112
}
1030
1113
1031
- #[ test]
1032
- fn test_write_entry_to_row_group ( ) {
1033
- let ( schema, tag_names, field_names, write_entry) = generate_write_entry ( ) ;
1034
- let rows =
1035
- write_entry_to_rows ( "test_table" , & schema, & tag_names, & field_names, write_entry)
1036
- . unwrap ( ) ;
1037
- let row0 = vec ! [
1038
- Datum :: Timestamp ( Timestamp :: new( 1000 ) ) ,
1039
- Datum :: String ( TAG_V . into( ) ) ,
1040
- Datum :: String ( TAG_V1 . into( ) ) ,
1041
- Datum :: Double ( 100.0 ) ,
1042
- Datum :: Null ,
1043
- ] ;
1044
- let row1 = vec ! [
1045
- Datum :: Timestamp ( Timestamp :: new( 2000 ) ) ,
1046
- Datum :: String ( TAG_V . into( ) ) ,
1047
- Datum :: String ( TAG_V1 . into( ) ) ,
1048
- Datum :: Null ,
1049
- Datum :: String ( FIELD_VALUE_STRING . into( ) ) ,
1050
- ] ;
1051
- let row2 = vec ! [
1052
- Datum :: Timestamp ( Timestamp :: new( 3000 ) ) ,
1053
- Datum :: String ( TAG_V . into( ) ) ,
1054
- Datum :: String ( TAG_V1 . into( ) ) ,
1055
- Datum :: Null ,
1056
- Datum :: String ( FIELD_VALUE_STRING . into( ) ) ,
1057
- ] ;
1114
+ fn generate_write_table_request ( ) -> WriteTableRequest {
1115
+ let tag1 = make_tag ( 0 , NAME_NEW_COL1 ) ;
1116
+ let tag2 = make_tag ( 1 , NAME_COL1 ) ;
1117
+ let tags = vec ! [ tag1, tag2] ;
1058
1118
1059
- let expect_rows = vec ! [
1060
- Row :: from_datums( row0) ,
1061
- Row :: from_datums( row1) ,
1062
- Row :: from_datums( row2) ,
1063
- ] ;
1064
- assert_eq ! ( rows, expect_rows) ;
1119
+ let field1 = make_field ( 0 , value:: Value :: Int64Value ( 100 ) ) ;
1120
+ let field2 = make_field ( 1 , value:: Value :: Int64Value ( 10 ) ) ;
1121
+
1122
+ let field_group1 = FieldGroup {
1123
+ timestamp : 1000 ,
1124
+ fields : vec ! [ field1. clone( ) , field2. clone( ) ] ,
1125
+ } ;
1126
+ let field_group2 = FieldGroup {
1127
+ timestamp : 2000 ,
1128
+ fields : vec ! [ field1] ,
1129
+ } ;
1130
+ let field_group3 = FieldGroup {
1131
+ timestamp : 3000 ,
1132
+ fields : vec ! [ field2] ,
1133
+ } ;
1134
+
1135
+ let write_entry = WriteSeriesEntry {
1136
+ tags,
1137
+ field_groups : vec ! [ field_group1, field_group2, field_group3] ,
1138
+ } ;
1139
+
1140
+ let tag_names = vec ! [ NAME_NEW_COL1 . to_string( ) , NAME_COL1 . to_string( ) ] ;
1141
+ let field_names = vec ! [ NAME_COL3 . to_string( ) , NAME_COL5 . to_string( ) ] ;
1142
+
1143
+ WriteTableRequest {
1144
+ table : "test" . to_string ( ) ,
1145
+ tag_names,
1146
+ field_names,
1147
+ entries : vec ! [ write_entry] ,
1148
+ }
1065
1149
}
1066
1150
}
0 commit comments