@@ -11,6 +11,7 @@ use ceresdbproto::storage::{
11
11
} ;
12
12
use cluster:: config:: SchemaConfig ;
13
13
use common_types:: {
14
+ column_schema:: ColumnSchema ,
14
15
datum:: { Datum , DatumKind } ,
15
16
request_id:: RequestId ,
16
17
row:: { Row , RowGroupBuilder } ,
@@ -20,12 +21,13 @@ use common_types::{
20
21
use common_util:: error:: BoxError ;
21
22
use http:: StatusCode ;
22
23
use interpreters:: { context:: Context as InterpreterContext , factory:: Factory , interpreter:: Output } ;
23
- use log:: debug;
24
+ use log:: { debug, error , info } ;
24
25
use query_engine:: executor:: Executor as QueryExecutor ;
25
26
use snafu:: { ensure, OptionExt , ResultExt } ;
26
27
use sql:: {
27
28
frontend:: { Context as FrontendContext , Frontend } ,
28
- plan:: { InsertPlan , Plan } ,
29
+ plan:: { AlterTableOperation , AlterTablePlan , InsertPlan , Plan } ,
30
+ planner:: build_schema_from_write_table_request,
29
31
provider:: CatalogMetaProvider ,
30
32
} ;
31
33
use table_engine:: table:: TableRef ;
@@ -69,10 +71,13 @@ impl WriteContext {
69
71
impl < Q : QueryExecutor + ' static > Proxy < Q > {
70
72
pub async fn handle_write ( & self , ctx : Context , req : WriteRequest ) -> WriteResponse {
71
73
match self . handle_write_internal ( ctx, req) . await {
72
- Err ( e) => WriteResponse {
73
- header : Some ( error:: build_err_header ( e) ) ,
74
- ..Default :: default ( )
75
- } ,
74
+ Err ( e) => {
75
+ error ! ( "Failed to handle write, err:{e}" ) ;
76
+ WriteResponse {
77
+ header : Some ( error:: build_err_header ( e) ) ,
78
+ ..Default :: default ( )
79
+ }
80
+ }
76
81
Ok ( v) => v,
77
82
}
78
83
}
@@ -126,7 +131,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
126
131
127
132
let mut success = 0 ;
128
133
for insert_plan in plan_vec {
129
- success += execute_plan (
134
+ success += execute_insert_plan (
130
135
request_id,
131
136
catalog,
132
137
& schema,
@@ -167,26 +172,54 @@ pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
167
172
deadline,
168
173
auto_create_table,
169
174
} = write_context;
170
-
175
+ let schema_config = schema_config . cloned ( ) . unwrap_or_default ( ) ;
171
176
for write_table_req in table_requests {
172
177
let table_name = & write_table_req. table ;
173
178
let mut table = try_get_table ( & catalog, & schema, instance. clone ( ) , table_name) ?;
174
179
175
- if table. is_none ( ) && auto_create_table {
176
- // TODO: remove this clone?
177
- let schema_config = schema_config. cloned ( ) . unwrap_or_default ( ) ;
178
- create_table (
179
- request_id,
180
- & catalog,
181
- & schema,
182
- instance. clone ( ) ,
183
- & write_table_req,
184
- & schema_config,
185
- deadline,
186
- )
187
- . await ?;
188
- // try to get table again
189
- table = try_get_table ( & catalog, & schema, instance. clone ( ) , table_name) ?;
180
+ match table. clone ( ) {
181
+ None => {
182
+ if auto_create_table {
183
+ create_table (
184
+ request_id,
185
+ & catalog,
186
+ & schema,
187
+ instance. clone ( ) ,
188
+ & write_table_req,
189
+ & schema_config,
190
+ deadline,
191
+ )
192
+ . await ?;
193
+ // try to get table again
194
+ table = try_get_table ( & catalog, & schema, instance. clone ( ) , table_name) ?;
195
+ }
196
+ }
197
+ Some ( t) => {
198
+ if auto_create_table {
199
+ // The reasons for making the decision to add columns before writing are as
200
+ // follows:
201
+ // * If judged based on the error message returned, it may cause data that has
202
+ // already been successfully written to be written again and affect the
203
+ // accuracy of the data.
204
+ // * Currently, the decision to add columns is made at the request level, not at
205
+ // the row level, so the cost is relatively small.
206
+ let table_schema = t. schema ( ) ;
207
+ let columns =
208
+ find_new_columns ( & table_schema, & schema_config, & write_table_req) ?;
209
+ if !columns. is_empty ( ) {
210
+ execute_add_columns_plan (
211
+ request_id,
212
+ & catalog,
213
+ & schema,
214
+ instance. clone ( ) ,
215
+ t,
216
+ columns,
217
+ deadline,
218
+ )
219
+ . await ?;
220
+ }
221
+ }
222
+ }
190
223
}
191
224
192
225
match table {
@@ -207,7 +240,7 @@ pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
207
240
Ok ( plan_vec)
208
241
}
209
242
210
- pub async fn execute_plan < Q : QueryExecutor + ' static > (
243
+ pub async fn execute_insert_plan < Q : QueryExecutor + ' static > (
211
244
request_id : RequestId ,
212
245
catalog : & str ,
213
246
schema : & str ,
@@ -221,50 +254,15 @@ pub async fn execute_plan<Q: QueryExecutor + 'static>(
221
254
insert_plan. rows. num_rows( )
222
255
) ;
223
256
let plan = Plan :: Insert ( insert_plan) ;
224
-
225
- instance
226
- . limiter
227
- . try_limit ( & plan)
228
- . box_err ( )
229
- . context ( ErrWithCause {
230
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
231
- msg : "Insert is blocked" ,
232
- } ) ?;
233
-
234
- let interpreter_ctx = InterpreterContext :: builder ( request_id, deadline)
235
- // Use current ctx's catalog and schema as default catalog and schema
236
- . default_catalog_and_schema ( catalog. to_string ( ) , schema. to_string ( ) )
237
- . build ( ) ;
238
- let interpreter_factory = Factory :: new (
239
- instance. query_executor . clone ( ) ,
240
- instance. catalog_manager . clone ( ) ,
241
- instance. table_engine . clone ( ) ,
242
- instance. table_manipulator . clone ( ) ,
243
- ) ;
244
- let interpreter = interpreter_factory
245
- . create ( interpreter_ctx, plan)
246
- . box_err ( )
247
- . context ( ErrWithCause {
248
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
249
- msg : "Failed to create interpreter" ,
250
- } ) ?;
251
-
252
- interpreter
253
- . execute ( )
254
- . await
255
- . box_err ( )
256
- . context ( ErrWithCause {
257
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
258
- msg : "Failed to execute interpreter" ,
259
- } )
260
- . and_then ( |output| match output {
261
- Output :: AffectedRows ( n) => Ok ( n) ,
262
- Output :: Records ( _) => ErrNoCause {
263
- code : StatusCode :: BAD_REQUEST ,
264
- msg : "Invalid output type, expect AffectedRows, found Records" ,
265
- }
266
- . fail ( ) ,
267
- } )
257
+ let output = execute_plan ( request_id, catalog, schema, instance, plan, deadline) . await ;
258
+ output. and_then ( |output| match output {
259
+ Output :: AffectedRows ( n) => Ok ( n) ,
260
+ Output :: Records ( _) => ErrNoCause {
261
+ code : StatusCode :: BAD_REQUEST ,
262
+ msg : "Invalid output type, expect AffectedRows, found Records" ,
263
+ }
264
+ . fail ( ) ,
265
+ } )
268
266
}
269
267
270
268
fn try_get_table < Q : QueryExecutor + ' static > (
@@ -333,49 +331,15 @@ async fn create_table<Q: QueryExecutor + 'static>(
333
331
334
332
debug ! ( "Grpc handle create table begin, plan:{:?}" , plan) ;
335
333
336
- instance
337
- . limiter
338
- . try_limit ( & plan)
339
- . box_err ( )
340
- . context ( ErrWithCause {
341
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
342
- msg : "Create table is blocked" ,
343
- } ) ?;
344
-
345
- let interpreter_ctx = InterpreterContext :: builder ( request_id, deadline)
346
- // Use current ctx's catalog and schema as default catalog and schema
347
- . default_catalog_and_schema ( catalog. to_string ( ) , schema. to_string ( ) )
348
- . build ( ) ;
349
- let interpreter_factory = Factory :: new (
350
- instance. query_executor . clone ( ) ,
351
- instance. catalog_manager . clone ( ) ,
352
- instance. table_engine . clone ( ) ,
353
- instance. table_manipulator . clone ( ) ,
354
- ) ;
355
- let interpreter = interpreter_factory
356
- . create ( interpreter_ctx, plan)
357
- . box_err ( )
358
- . context ( ErrWithCause {
359
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
360
- msg : "Failed to create interpreter" ,
361
- } ) ?;
362
-
363
- interpreter
364
- . execute ( )
365
- . await
366
- . box_err ( )
367
- . context ( ErrWithCause {
334
+ let output = execute_plan ( request_id, catalog, schema, instance, plan, deadline) . await ;
335
+ output. and_then ( |output| match output {
336
+ Output :: AffectedRows ( _) => Ok ( ( ) ) ,
337
+ Output :: Records ( _) => ErrNoCause {
368
338
code : StatusCode :: INTERNAL_SERVER_ERROR ,
369
- msg : "Failed to execute interpreter" ,
370
- } )
371
- . and_then ( |output| match output {
372
- Output :: AffectedRows ( _) => Ok ( ( ) ) ,
373
- Output :: Records ( _) => ErrNoCause {
374
- code : StatusCode :: INTERNAL_SERVER_ERROR ,
375
- msg : "Invalid output type, expect AffectedRows, found Records" ,
376
- }
377
- . fail ( ) ,
378
- } )
339
+ msg : "Invalid output type, expect AffectedRows, found Records" ,
340
+ }
341
+ . fail ( ) ,
342
+ } )
379
343
}
380
344
381
345
fn write_table_request_to_insert_plan (
@@ -576,6 +540,92 @@ fn convert_proto_value_to_datum(
576
540
}
577
541
}
578
542
543
+ fn find_new_columns (
544
+ schema : & Schema ,
545
+ schema_config : & SchemaConfig ,
546
+ write_req : & WriteTableRequest ,
547
+ ) -> Result < Vec < ColumnSchema > > {
548
+ let new_schema = build_schema_from_write_table_request ( schema_config, write_req)
549
+ . box_err ( )
550
+ . context ( ErrWithCause {
551
+ code : StatusCode :: INTERNAL_SERVER_ERROR ,
552
+ msg : "Build schema from write table request failed" ,
553
+ } ) ?;
554
+
555
+ let columns = new_schema. columns ( ) ;
556
+ let old_columns = schema. columns ( ) ;
557
+
558
+ let new_columns = columns
559
+ . iter ( )
560
+ . filter ( |column| !old_columns. iter ( ) . any ( |c| c. name == column. name ) )
561
+ . cloned ( )
562
+ . collect ( ) ;
563
+ Ok ( new_columns)
564
+ }
565
+
566
+ async fn execute_add_columns_plan < Q : QueryExecutor + ' static > (
567
+ request_id : RequestId ,
568
+ catalog : & str ,
569
+ schema : & str ,
570
+ instance : InstanceRef < Q > ,
571
+ table : TableRef ,
572
+ columns : Vec < ColumnSchema > ,
573
+ deadline : Option < Instant > ,
574
+ ) -> Result < ( ) > {
575
+ let table_name = table. name ( ) . to_string ( ) ;
576
+ info ! ( "Add columns start, request_id:{request_id}, table:{table_name}, columns:{columns:?}" ) ;
577
+
578
+ let plan = Plan :: AlterTable ( AlterTablePlan {
579
+ table,
580
+ operations : AlterTableOperation :: AddColumn ( columns) ,
581
+ } ) ;
582
+ let _ = execute_plan ( request_id, catalog, schema, instance, plan, deadline) . await ?;
583
+
584
+ info ! ( "Add columns success, request_id:{request_id}, table:{table_name}" ) ;
585
+ Ok ( ( ) )
586
+ }
587
+
588
+ async fn execute_plan < Q : QueryExecutor + ' static > (
589
+ request_id : RequestId ,
590
+ catalog : & str ,
591
+ schema : & str ,
592
+ instance : InstanceRef < Q > ,
593
+ plan : Plan ,
594
+ deadline : Option < Instant > ,
595
+ ) -> Result < Output > {
596
+ instance
597
+ . limiter
598
+ . try_limit ( & plan)
599
+ . box_err ( )
600
+ . context ( ErrWithCause {
601
+ code : StatusCode :: INTERNAL_SERVER_ERROR ,
602
+ msg : "Request is blocked" ,
603
+ } ) ?;
604
+
605
+ let interpreter_ctx = InterpreterContext :: builder ( request_id, deadline)
606
+ // Use current ctx's catalog and schema as default catalog and schema
607
+ . default_catalog_and_schema ( catalog. to_string ( ) , schema. to_string ( ) )
608
+ . build ( ) ;
609
+ let interpreter_factory = Factory :: new (
610
+ instance. query_executor . clone ( ) ,
611
+ instance. catalog_manager . clone ( ) ,
612
+ instance. table_engine . clone ( ) ,
613
+ instance. table_manipulator . clone ( ) ,
614
+ ) ;
615
+ let interpreter = interpreter_factory
616
+ . create ( interpreter_ctx, plan)
617
+ . box_err ( )
618
+ . context ( ErrWithCause {
619
+ code : StatusCode :: INTERNAL_SERVER_ERROR ,
620
+ msg : "Failed to create interpreter" ,
621
+ } ) ?;
622
+
623
+ interpreter. execute ( ) . await . box_err ( ) . context ( ErrWithCause {
624
+ code : StatusCode :: INTERNAL_SERVER_ERROR ,
625
+ msg : "Failed to execute interpreter" ,
626
+ } )
627
+ }
628
+
579
629
#[ cfg( test) ]
580
630
mod test {
581
631
use ceresdbproto:: storage:: { Field , FieldGroup , Tag , Value } ;
0 commit comments