Skip to content

Commit aaa650a

Browse files
authored
feat: auto add column (apache#749)
* feat: introduce proxy module (apache#715) * impl route service with proxy * impl write service with proxy * remove forward module in proxy * refactor code * add tests in write * feat: impl query with proxy (apache#717) * refactor: refactor proxy module (apache#726) * refactor: refactor proxy module * cargo fmt * refactor by CR * Feat proxy prom query (apache#727) * feat: impl prom query with proxy * refactor code * feat: impl stream write with proxy (apache#737) * feat: impl stream query with proxy (apache#742) * feat: impl stream query with proxy * refactor by CR * feat: introduce proxy module * refactor code * add header in storage service * feat: impl storage service with proxy * make CI happy * refactor code * refactor code * refactor by CR * feat: automatically create non-existent columns during insertion * test: add autoAddColumns test in go sdk * refactor code * refactor by CR * refactor by CR
1 parent 54b72b0 commit aaa650a

File tree

8 files changed

+259
-150
lines changed

8 files changed

+259
-150
lines changed

integration_tests/sdk/go/main.go

+76-26
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,21 @@ func init() {
1919
}
2020
}
2121

22-
func write(ctx context.Context, client ceresdb.Client, ts int64) error {
22+
func write(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error {
2323
points := make([]ceresdb.Point, 0, 2)
2424
for i := 0; i < 2; i++ {
25-
point, err := ceresdb.NewPointBuilder(table).
25+
builder := ceresdb.NewPointBuilder(table).
2626
SetTimestamp(ts).
2727
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
28-
AddField("value", ceresdb.NewInt64Value(int64(i))).
29-
Build()
28+
AddField("value", ceresdb.NewInt64Value(int64(i)))
29+
30+
if addNewColumn {
31+
builder = builder.AddTag("new_tag", ceresdb.NewStringValue(fmt.Sprintf("new-tag-%d", i))).
32+
AddField("new_field", ceresdb.NewInt64Value(int64(i)))
33+
}
34+
35+
point, err := builder.Build()
36+
3037
if err != nil {
3138
return err
3239
}
@@ -57,10 +64,10 @@ func ensureRow(expectedVals []ceresdb.Value, actualRow []ceresdb.Column) error {
5764

5865
}
5966

60-
func query(ctx context.Context, client ceresdb.Client, ts int64) error {
67+
func query(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error {
6168
resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
6269
Tables: []string{table},
63-
SQL: fmt.Sprintf("select * from %s where timestamp = %d", table, ts),
70+
SQL: fmt.Sprintf("select * from %s where timestamp = %d order by name", table, ts),
6471
})
6572
if err != nil {
6673
return err
@@ -70,21 +77,32 @@ func query(ctx context.Context, client ceresdb.Client, ts int64) error {
7077
return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows))
7178
}
7279

73-
if err := ensureRow([]ceresdb.Value{
80+
row0 := []ceresdb.Value{
7481
ceresdb.NewUint64Value(4024844655630594205),
7582
ceresdb.NewInt64Value(ts),
7683
ceresdb.NewStringValue("tag-0"),
77-
ceresdb.NewInt64Value(0),
78-
}, resp.Rows[0].Columns()); err != nil {
79-
return err
80-
}
84+
ceresdb.NewInt64Value(0)}
8185

82-
return ensureRow([]ceresdb.Value{
86+
row1 := []ceresdb.Value{
8387
ceresdb.NewUint64Value(14230010170561829440),
8488
ceresdb.NewInt64Value(ts),
8589
ceresdb.NewStringValue("tag-1"),
8690
ceresdb.NewInt64Value(1),
87-
}, resp.Rows[1].Columns())
91+
}
92+
93+
if addNewColumn {
94+
row0[0] = ceresdb.NewUint64Value(8341999341185504339)
95+
row1[0] = ceresdb.NewUint64Value(4452331151453582498)
96+
row0 = append(row0, ceresdb.NewInt64Value(0), ceresdb.NewStringValue("new-tag-0"))
97+
row1 = append(row1, ceresdb.NewInt64Value(1), ceresdb.NewStringValue("new-tag-1"))
98+
}
99+
100+
if err := ensureRow(row0,
101+
resp.Rows[0].Columns()); err != nil {
102+
return err
103+
}
104+
105+
return ensureRow(row1, resp.Rows[1].Columns())
88106
}
89107

90108
func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) {
@@ -99,6 +117,48 @@ func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error)
99117
return resp.AffectedRows, nil
100118
}
101119

120+
func checkAutoCreateTable(ctx context.Context, client ceresdb.Client) error {
121+
if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil {
122+
return err
123+
}
124+
125+
ts := currentMS()
126+
if err := write(ctx, client, ts, false); err != nil {
127+
return err
128+
}
129+
130+
if err := query(ctx, client, ts, false); err != nil {
131+
return err
132+
}
133+
134+
return nil
135+
}
136+
137+
func checkAutoAddColumns(ctx context.Context, client ceresdb.Client) error {
138+
ts := currentMS()
139+
if err := write(ctx, client, ts, true); err != nil {
140+
return err
141+
}
142+
143+
if err := query(ctx, client, ts, true); err != nil {
144+
return err
145+
}
146+
147+
return nil
148+
}
149+
150+
func dropTable(ctx context.Context, client ceresdb.Client) error {
151+
affected, err := ddl(ctx, client, "drop table "+table)
152+
if err != nil {
153+
return err
154+
}
155+
156+
if affected != 0 {
157+
panic(fmt.Sprintf("drop table expected 0, actual is %d", affected))
158+
}
159+
return nil
160+
}
161+
102162
func main() {
103163
fmt.Printf("Begin test, endpoint %s...\n", endpoint)
104164

@@ -110,28 +170,18 @@ func main() {
110170
}
111171

112172
ctx := context.TODO()
113-
if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil {
114-
panic(err)
115-
}
116-
117-
ts := currentMS()
118-
if err := write(ctx, client, ts); err != nil {
173+
if err = checkAutoCreateTable(ctx, client); err != nil {
119174
panic(err)
120175
}
121176

122-
if err := query(ctx, client, ts); err != nil {
177+
if err = checkAutoAddColumns(ctx, client); err != nil {
123178
panic(err)
124179
}
125180

126-
affected, err := ddl(ctx, client, "drop table "+table)
127-
if err != nil {
181+
if err = dropTable(ctx, client); err != nil {
128182
panic(err)
129183
}
130184

131-
if affected != 0 {
132-
panic(fmt.Sprintf("drop table expected 0, actual is %d", affected))
133-
}
134-
135185
fmt.Println("Test done")
136186
}
137187

server/src/handlers/influxdb.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
context::RequestContext,
2727
handlers,
2828
instance::InstanceRef,
29-
proxy::grpc::write::{execute_plan, write_request_to_insert_plan, WriteContext},
29+
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
3030
schema_config_provider::SchemaConfigProviderRef,
3131
};
3232

@@ -120,7 +120,7 @@ impl<Q: QueryExecutor + 'static> InfluxDb<Q> {
120120

121121
let mut success = 0;
122122
for insert_plan in plans {
123-
success += execute_plan(
123+
success += execute_insert_plan(
124124
request_id,
125125
catalog,
126126
schema,

server/src/handlers/prom.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
context::RequestContext,
3131
handlers,
3232
instance::InstanceRef,
33-
proxy::grpc::write::{execute_plan, write_request_to_insert_plan, WriteContext},
33+
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
3434
schema_config_provider::SchemaConfigProviderRef,
3535
};
3636

@@ -257,7 +257,7 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
257257

258258
let mut success = 0;
259259
for insert_plan in plans {
260-
success += execute_plan(
260+
success += execute_insert_plan(
261261
request_id,
262262
catalog,
263263
schema,

server/src/proxy/grpc/prom_query.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use common_types::{
2020
use common_util::error::BoxError;
2121
use http::StatusCode;
2222
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
23-
use log::info;
23+
use log::{error, info};
2424
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
2525
use snafu::{ensure, OptionExt, ResultExt};
2626
use sql::{
@@ -42,10 +42,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
4242
req: PrometheusQueryRequest,
4343
) -> PrometheusQueryResponse {
4444
match self.handle_prom_query_internal(ctx, req).await {
45-
Err(e) => PrometheusQueryResponse {
46-
header: Some(error::build_err_header(e)),
47-
..Default::default()
48-
},
45+
Err(e) => {
46+
error!("Failed to handle prom query, err:{e}");
47+
PrometheusQueryResponse {
48+
header: Some(error::build_err_header(e)),
49+
..Default::default()
50+
}
51+
}
4952
Ok(v) => v,
5053
}
5154
}

server/src/proxy/grpc/route.rs

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use ceresdbproto::storage::{RouteRequest, RouteResponse};
44
use common_util::error::BoxError;
55
use http::StatusCode;
6+
use log::error;
67
use query_engine::executor::Executor as QueryExecutor;
78
use snafu::ResultExt;
89

@@ -23,6 +24,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
2324
let mut resp = RouteResponse::default();
2425
match routes {
2526
Err(e) => {
27+
error!("Failed to handle route, err:{e}");
2628
resp.header = Some(error::build_err_header(e));
2729
}
2830
Ok(v) => {

server/src/proxy/grpc/sql_query.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,13 @@ const STREAM_QUERY_CHANNEL_LEN: usize = 20;
4040
impl<Q: QueryExecutor + 'static> Proxy<Q> {
4141
pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse {
4242
match self.handle_sql_query_internal(ctx, req).await {
43-
Err(e) => SqlQueryResponse {
44-
header: Some(error::build_err_header(e)),
45-
..Default::default()
46-
},
43+
Err(e) => {
44+
error!("Failed to handle sql query, err:{e}");
45+
SqlQueryResponse {
46+
header: Some(error::build_err_header(e)),
47+
..Default::default()
48+
}
49+
}
4750
Ok(v) => v,
4851
}
4952
}
@@ -55,6 +58,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
5558
) -> BoxStream<'static, SqlQueryResponse> {
5659
match self.clone().handle_stream_query_internal(ctx, req).await {
5760
Err(e) => stream::once(async {
61+
error!("Failed to handle stream sql query, err:{e}");
5862
SqlQueryResponse {
5963
header: Some(error::build_err_header(e)),
6064
..Default::default()

0 commit comments

Comments
 (0)