@@ -6,8 +6,12 @@ use std::sync::Arc;
6
6
7
7
use common_util:: error:: BoxError ;
8
8
use influxql_logical_planner:: planner:: InfluxQLToLogicalPlan ;
9
- use influxql_parser:: statement:: Statement as InfluxqlStatement ;
10
- use snafu:: ResultExt ;
9
+ use influxql_parser:: {
10
+ common:: { MeasurementName , QualifiedMeasurementName } ,
11
+ select:: { MeasurementSelection , SelectStatement } ,
12
+ statement:: Statement as InfluxqlStatement ,
13
+ } ;
14
+ use snafu:: { ensure, ResultExt } ;
11
15
12
16
use crate :: {
13
17
influxql:: { error:: * , provider:: InfluxSchemaProviderImpl } ,
@@ -44,13 +48,19 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
44
48
| InfluxqlStatement :: Delete ( _)
45
49
| InfluxqlStatement :: DropMeasurement ( _)
46
50
| InfluxqlStatement :: Explain ( _) => Unimplemented {
47
- stmt : stmt. to_string ( ) ,
51
+ msg : stmt. to_string ( ) ,
48
52
}
49
53
. fail ( ) ,
50
54
}
51
55
}
52
56
53
57
pub fn select_to_plan ( self , stmt : InfluxqlStatement ) -> Result < Plan > {
58
+ if let InfluxqlStatement :: Select ( select_stmt) = & stmt {
59
+ check_select_statement ( select_stmt) ?;
60
+ } else {
61
+ unreachable ! ( "select statement here has been ensured by caller" ) ;
62
+ }
63
+
54
64
let influx_schema_provider = InfluxSchemaProviderImpl {
55
65
context_provider : & self . context_provider ,
56
66
} ;
@@ -62,18 +72,94 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
62
72
let df_plan = influxql_logical_planner
63
73
. statement_to_plan ( stmt)
64
74
. box_err ( )
65
- . context ( BuildPlan {
75
+ . context ( BuildPlanWithCause {
66
76
msg : "build df plan for influxql select statement" ,
67
77
} ) ?;
68
78
let tables = Arc :: new (
69
79
self . context_provider
70
80
. try_into_container ( )
71
81
. box_err ( )
72
- . context ( BuildPlan {
82
+ . context ( BuildPlanWithCause {
73
83
msg : "get tables from df plan of select" ,
74
84
} ) ?,
75
85
) ;
76
86
77
87
Ok ( Plan :: Query ( QueryPlan { df_plan, tables } ) )
78
88
}
79
89
}
90
+
91
+ pub fn check_select_statement ( select_stmt : & SelectStatement ) -> Result < ( ) > {
92
+ // Only support from single measurements now.
93
+ ensure ! (
94
+ !select_stmt. from. is_empty( ) ,
95
+ BuildPlanNoCause {
96
+ msg: format!( "invalid influxql select statement with empty from, stmt:{select_stmt}" ) ,
97
+ }
98
+ ) ;
99
+ ensure ! (
100
+ select_stmt. from. len( ) == 1 ,
101
+ Unimplemented {
102
+ msg: format!( "select from multiple measurements, stmt:{select_stmt}" ) ,
103
+ }
104
+ ) ;
105
+
106
+ let from = & select_stmt. from [ 0 ] ;
107
+ match from {
108
+ MeasurementSelection :: Name ( name) => {
109
+ let QualifiedMeasurementName { name, .. } = name;
110
+
111
+ match name {
112
+ MeasurementName :: Regex ( _) => Unimplemented {
113
+ msg : format ! ( "select from regex, stmt:{select_stmt}" ) ,
114
+ }
115
+ . fail ( ) ,
116
+ MeasurementName :: Name ( _) => Ok ( ( ) ) ,
117
+ }
118
+ }
119
+ MeasurementSelection :: Subquery ( _) => Unimplemented {
120
+ msg : format ! ( "select from subquery, stmt:{select_stmt}" ) ,
121
+ }
122
+ . fail ( ) ,
123
+ }
124
+ }
125
+
126
+ #[ cfg( test) ]
127
+ mod test {
128
+ use influxql_parser:: { select:: SelectStatement , statement:: Statement } ;
129
+
130
+ use super :: check_select_statement;
131
+
132
+ #[ test]
133
+ fn test_check_select_from ( ) {
134
+ let from_measurement = parse_select ( "select * from a;" ) ;
135
+ let from_multi_measurements = parse_select ( "select * from a,b;" ) ;
136
+ let from_regex = parse_select ( r#"select * from /d/"# ) ;
137
+ let from_subquery = parse_select ( "select * from (select a,b from c)" ) ;
138
+
139
+ let res = check_select_statement ( & from_measurement) ;
140
+ assert ! ( res. is_ok( ) ) ;
141
+
142
+ let res = check_select_statement ( & from_multi_measurements) ;
143
+ let err = res. err ( ) . unwrap ( ) ;
144
+ assert ! ( err
145
+ . to_string( )
146
+ . contains( "select from multiple measurements" ) ) ;
147
+
148
+ let res = check_select_statement ( & from_regex) ;
149
+ let err = res. err ( ) . unwrap ( ) ;
150
+ assert ! ( err. to_string( ) . contains( "select from regex" ) ) ;
151
+
152
+ let res = check_select_statement ( & from_subquery) ;
153
+ let err = res. err ( ) . unwrap ( ) ;
154
+ assert ! ( err. to_string( ) . contains( "select from subquery" ) ) ;
155
+ }
156
+
157
+ fn parse_select ( influxql : & str ) -> SelectStatement {
158
+ let stmt = influxql_parser:: parse_statements ( influxql) . unwrap ( ) [ 0 ] . clone ( ) ;
159
+ if let Statement :: Select ( select_stmt) = stmt {
160
+ * select_stmt
161
+ } else {
162
+ unreachable ! ( )
163
+ }
164
+ }
165
+ }
0 commit comments