Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: alter partition table tag column #1304

Merged
merged 5 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: true
- uses: actions/setup-go@v3
with:
go-version: 1.21
- run: |
rustup set auto-self-update disable
rustup toolchain install ${RUST_VERSION} --profile minimal
Expand All @@ -169,11 +172,12 @@ jobs:
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
- name: Build and Run CeresDB
- name: Build and Run CeresDB-Cluster
working-directory: integration_tests
run: |
make build-debug
nohup ./target/debug/ceresdb-server -c docs/minimal.toml > /tmp/ceresdb-stdout.log &
sleep 10
make prepare
make run-ceresmeta
make run-ceresdb-cluster
- name: Run Go SDK tests
working-directory: integration_tests
run: |
Expand Down
17 changes: 15 additions & 2 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,25 @@ build-test:

build: build-ceresdb build-test

kill-old-process:
kill-old-ceresmeta:
killall ceresmeta-server | true

kill-old-ceresdb:
killall ceresdb-server | true
killall ceresmeta | true

kill-old-process: kill-old-ceresmeta kill-old-ceresdb

prepare: clean build kill-old-process

run-ceresmeta: build-meta
nohup $(CERESMETA_BINARY_PATH) --config ${CERESMETA_CONFIG_PATH} > /tmp/ceresmeta-stdout.log 2>&1 &
sleep 10

run-ceresdb-cluster: build-ceresdb
nohup ${CERESDB_BINARY_PATH} --config ${CERESDB_CONFIG_FILE_0} > ${CLUSTER_CERESDB_STDOUT_FILE_0} 2>&1 &
nohup ${CERESDB_BINARY_PATH} --config ${CERESDB_CONFIG_FILE_1} > ${CLUSTER_CERESDB_STDOUT_FILE_1} 2>&1 &
sleep 30

run: prepare build-meta
$(CERESDB_TEST_BINARY)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/config/ceresdb-cluster-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ level = "debug"

[server]
bind_addr = "0.0.0.0"
http_port = 15440
grpc_port = 18831
http_port = 5441
grpc_port = 8832
mysql_port = 13307
postgresql_port = 15433
deploy_mode = "Cluster"
Expand Down
193 changes: 193 additions & 0 deletions integration_tests/sdk/go/alteraddcolumn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package main

import (
"context"
"fmt"

"github.com/CeresDB/ceresdb-client-go/ceresdb"
)

const fieldName = "b"
const tagName = "btag"
const timestampName = "t"

func checkPartitionTableAddColumn(ctx context.Context, client ceresdb.Client) error {
err := dropTable(ctx, client, partitionTable)
if err != nil {
return err
}

_, err = ddl(ctx, client, partitionTable, fmt.Sprintf(
"CREATE TABLE `%s`( "+
"`name`string TAG,"+
"`id` int TAG,"+
"`value` int64 NOT NULL,"+
"`t` timestamp NOT NULL,"+
"TIMESTAMP KEY(t)) "+
"PARTITION BY KEY(name) PARTITIONS 4 ENGINE = Analytic", partitionTable))
if err != nil {
return err
}

_, err = ddl(ctx, client, partitionTable, fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN (%s string);", partitionTable, fieldName))
if err != nil {
return err
}

ts := currentMS()

// First write will fail, because the schema is not updated yet.
// Currently, write failed will update the schema.
err = writePartitionTableNewField(ctx, client, ts, fieldName)
if err == nil {
panic("first write should fail")
}

if err := writePartitionTableNewField(ctx, client, ts, fieldName); err != nil {
return err
}

_, err = ddl(ctx, client, partitionTable, fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN (%s string TAG);", partitionTable, tagName))
if err != nil {
return err
}

// First write will fail, because the schema is not updated yet.
// Currently, write failed will update the schema.
err = writePartitionTableNewTag(ctx, client, ts, tagName)
if err == nil {
panic("first write should fail")
}

if err := writePartitionTableNewTag(ctx, client, ts, tagName); err != nil {
return err
}

if err := queryPartitionTable(ctx, client, ts, timestampName); err != nil {
return err
}

return nil
}

func writePartitionTableNewField(ctx context.Context, client ceresdb.Client, ts int64, fieldName string) error {
points := make([]ceresdb.Point, 0, 2)
for i := 0; i < 2; i++ {
builder := ceresdb.NewPointBuilder(partitionTable).
SetTimestamp(ts).
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
AddField("value", ceresdb.NewInt64Value(int64(i))).
AddField(fieldName, ceresdb.NewStringValue("ss"))

point, err := builder.Build()

if err != nil {
return err
}
points = append(points, point)
}

resp, err := client.Write(ctx, ceresdb.WriteRequest{
Points: points,
})
if err != nil {
return err
}

if resp.Success != 2 {
return fmt.Errorf("write failed, resp: %+v", resp)
}
return nil
}

func writePartitionTableNewTag(ctx context.Context, client ceresdb.Client, ts int64, tagName string) error {
points := make([]ceresdb.Point, 0, 2)
for i := 0; i < 2; i++ {
builder := ceresdb.NewPointBuilder(partitionTable).
SetTimestamp(ts).
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
AddField("value", ceresdb.NewInt64Value(int64(i))).
AddTag(tagName, ceresdb.NewStringValue("sstag")).
AddField(fieldName, ceresdb.NewStringValue("ss"))

point, err := builder.Build()

if err != nil {
return err
}
points = append(points, point)
}

resp, err := client.Write(ctx, ceresdb.WriteRequest{
Points: points,
})
if err != nil {
return err
}

if resp.Success != 2 {
return fmt.Errorf("write failed, resp: %+v", resp)
}
return nil
}

func queryPartitionTable(ctx context.Context, client ceresdb.Client, ts int64, timestampName string) error {
sql := fmt.Sprintf("select t, name, value,%s,%s from %s where %s = %d order by name,%s", fieldName, tagName, partitionTable, timestampName, ts, tagName)

resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
Tables: []string{partitionTable},
SQL: sql,
})
if err != nil {
return err
}

if len(resp.Rows) != 4 {
return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows))
}

row0 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-0"),
ceresdb.NewInt64Value(0),
ceresdb.NewStringValue("ss"),
ceresdb.NewStringValue("sstag"),
}

row1 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-0"),
ceresdb.NewInt64Value(0),
ceresdb.NewStringValue("ss"),
}

row2 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-1"),
ceresdb.NewInt64Value(1),
ceresdb.NewStringValue("ss"),
ceresdb.NewStringValue("sstag"),
}

row3 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-1"),
ceresdb.NewInt64Value(1),
ceresdb.NewStringValue("ss"),
}

if err := ensureRow(row0,
resp.Rows[0].Columns()); err != nil {
return err
}
if err := ensureRow(row1,
resp.Rows[1].Columns()); err != nil {
return err
}
if err := ensureRow(row2,
resp.Rows[2].Columns()); err != nil {
return err
}

return ensureRow(row3, resp.Rows[3].Columns())
}
2 changes: 1 addition & 1 deletion integration_tests/sdk/go/autocreatetable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func checkAutoAddColumns(ctx context.Context, client ceresdb.Client) error {
timestampName := "timestamp"
err := dropTable(ctx, client)
err := dropTable(ctx, client, table)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions integration_tests/sdk/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ module go-sdk-test

go 1.17

require github.com/CeresDB/ceresdb-client-go v1.1.0
require github.com/CeresDB/ceresdb-client-go v1.1.3

require (
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions integration_tests/sdk/go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdb-client-go v1.1.0 h1:iCBb03OubOA7NCLSiYBis9lMaR3nu6s0wipUulEeonU=
github.com/CeresDB/ceresdb-client-go v1.1.0/go.mod h1:bZneg5VvvfSZhCnlKO0SjIoPY+fLKnFwhPb3gijLorE=
github.com/CeresDB/ceresdb-client-go v1.1.3 h1:Qm01ekHSSWfuAjSyMtuGYs5L2UlKl6SIXVWYHpcH5GY=
github.com/CeresDB/ceresdb-client-go v1.1.3/go.mod h1:Wg7MC22gqth8rpmJiQW85meo8UbPZDjzWMgVvWfP08w=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131 h1:ePzWeIoLOT4FsdjNrmEVrrtm412H2xHM6ZNLamB0bqk=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
Expand Down Expand Up @@ -783,6 +785,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -822,6 +826,7 @@ github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/sdk/go/issue-779.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
func checkAutoAddColumnsWithCreateTable(ctx context.Context, client ceresdb.Client) error {
timestampName := "timestamp"

err := dropTable(ctx, client)
err := dropTable(ctx, client, table)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/sdk/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func main() {
panic(err)
}

if err = checkPartitionTableAddColumn(ctx, client); err != nil {
panic(err)
}

fmt.Println("Test done")
}

Expand Down
11 changes: 6 additions & 5 deletions integration_tests/sdk/go/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

const table = "godemo"
const partitionTable = "godemoPartition"

func createTable(ctx context.Context, client ceresdb.Client, timestampName string) error {
_, err := ddl(ctx, client, fmt.Sprintf("create table %s (`%s` timestamp not null, name string tag, value int64,TIMESTAMP KEY(%s))", table, timestampName, timestampName))
_, err := ddl(ctx, client, table, fmt.Sprintf("create table %s (`%s` timestamp not null, name string tag, value int64,TIMESTAMP KEY(%s))", table, timestampName, timestampName))
return err
}

Expand Down Expand Up @@ -100,9 +101,9 @@ func query(ctx context.Context, client ceresdb.Client, ts int64, timestampName s
return ensureRow(row1, resp.Rows[1].Columns())
}

func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) {
func ddl(ctx context.Context, client ceresdb.Client, tableName string, sql string) (uint32, error) {
resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
Tables: []string{table},
Tables: []string{tableName},
SQL: sql,
})
if err != nil {
Expand Down Expand Up @@ -138,8 +139,8 @@ func writeAndQueryWithNewColumns(ctx context.Context, client ceresdb.Client, tim
return nil
}

func dropTable(ctx context.Context, client ceresdb.Client) error {
affected, err := ddl(ctx, client, "drop table if exists "+table)
func dropTable(ctx context.Context, client ceresdb.Client, table string) error {
affected, err := ddl(ctx, client, table, "drop table if exists "+table)
if err != nil {
return err
}
Expand Down
Loading