Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto add column #749

Merged
merged 21 commits into from
Mar 22, 2023
Merged
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 81 additions & 27 deletions integration_tests/sdk/go/main.go
Original file line number Diff line number Diff line change
@@ -19,14 +19,21 @@ func init() {
}
}

func write(ctx context.Context, client ceresdb.Client, ts int64) error {
func write(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error {
points := make([]ceresdb.Point, 0, 2)
for i := 0; i < 2; i++ {
point, err := ceresdb.NewPointBuilder(table).
builder := ceresdb.NewPointBuilder(table).
SetTimestamp(ts).
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
AddField("value", ceresdb.NewInt64Value(int64(i))).
Build()
AddField("value", ceresdb.NewInt64Value(int64(i)))

if addNewColumn {
builder = builder.AddTag("new_tag", ceresdb.NewStringValue(fmt.Sprintf("new-tag-%d", i))).
AddField("new_field", ceresdb.NewInt64Value(int64(i)))
}

point, err := builder.Build()

if err != nil {
return err
}
@@ -57,10 +64,10 @@ func ensureRow(expectedVals []ceresdb.Value, actualRow []ceresdb.Column) error {

}

func query(ctx context.Context, client ceresdb.Client, ts int64) error {
func query(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error {
resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
Tables: []string{table},
SQL: fmt.Sprintf("select * from %s where timestamp = %d", table, ts),
SQL: fmt.Sprintf("select * from %s where timestamp = %d order by name", table, ts),
})
if err != nil {
return err
@@ -70,21 +77,32 @@ func query(ctx context.Context, client ceresdb.Client, ts int64) error {
return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows))
}

if err := ensureRow([]ceresdb.Value{
row0 := []ceresdb.Value{
ceresdb.NewUint64Value(4024844655630594205),
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-0"),
ceresdb.NewInt64Value(0),
}, resp.Rows[0].Columns()); err != nil {
return err
}
ceresdb.NewInt64Value(0)}

return ensureRow([]ceresdb.Value{
row1 := []ceresdb.Value{
ceresdb.NewUint64Value(14230010170561829440),
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-1"),
ceresdb.NewInt64Value(1),
}, resp.Rows[1].Columns())
}

if addNewColumn {
row0[0] = ceresdb.NewUint64Value(8341999341185504339)
row1[0] = ceresdb.NewUint64Value(4452331151453582498)
row0 = append(row0, ceresdb.NewInt64Value(0), ceresdb.NewStringValue("new-tag-0"))
row1 = append(row1, ceresdb.NewInt64Value(1), ceresdb.NewStringValue("new-tag-1"))
}

if err := ensureRow(row0,
resp.Rows[0].Columns()); err != nil {
return err
}

return ensureRow(row1, resp.Rows[1].Columns())
}

func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) {
@@ -99,6 +117,53 @@ func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error)
return resp.AffectedRows, nil
}

func checkAutoCreateTable(client ceresdb.Client) error {
ctx := context.TODO()
if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil {
return err
}

ts := currentMS()
if err := write(ctx, client, ts, false); err != nil {
return err
}

if err := query(ctx, client, ts, false); err != nil {
return err
}

return nil
}

func checkAutoAddColumns(client ceresdb.Client) error {
ctx := context.TODO()

ts := currentMS()
if err := write(ctx, client, ts, true); err != nil {
return err
}

if err := query(ctx, client, ts, true); err != nil {
return err
}

return nil
}

func dropTable(client ceresdb.Client) error {
ctx := context.TODO()

affected, err := ddl(ctx, client, "drop table "+table)
if err != nil {
return err
}

if affected != 0 {
panic(fmt.Sprintf("drop table expected 0, actual is %d", affected))
}
return nil
}

func main() {
fmt.Printf("Begin test, endpoint %s...\n", endpoint)

@@ -109,29 +174,18 @@ func main() {
panic(err)
}

ctx := context.TODO()
if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil {
panic(err)
}

ts := currentMS()
if err := write(ctx, client, ts); err != nil {
if err = checkAutoCreateTable(client); err != nil {
panic(err)
}

if err := query(ctx, client, ts); err != nil {
if err = checkAutoAddColumns(client); err != nil {
panic(err)
}

affected, err := ddl(ctx, client, "drop table "+table)
if err != nil {
if err = dropTable(client); err != nil {
panic(err)
}

if affected != 0 {
panic(fmt.Sprintf("drop table expected 0, actual is %d", affected))
}

fmt.Println("Test done")
}

4 changes: 2 additions & 2 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ use crate::{
context::RequestContext,
handlers,
instance::InstanceRef,
proxy::grpc::write::{execute_plan, write_request_to_insert_plan, WriteContext},
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
schema_config_provider::SchemaConfigProviderRef,
};

@@ -120,7 +120,7 @@ impl<Q: QueryExecutor + 'static> InfluxDb<Q> {

let mut success = 0;
for insert_plan in plans {
success += execute_plan(
success += execute_insert_plan(
request_id,
catalog,
schema,
4 changes: 2 additions & 2 deletions server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ use crate::{
context::RequestContext,
handlers,
instance::InstanceRef,
proxy::grpc::write::{execute_plan, write_request_to_insert_plan, WriteContext},
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
schema_config_provider::SchemaConfigProviderRef,
};

@@ -257,7 +257,7 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {

let mut success = 0;
for insert_plan in plans {
success += execute_plan(
success += execute_insert_plan(
request_id,
catalog,
schema,
Loading