Skip to content

Commit

Permalink
test: add test for shard-group duration migration
Browse files Browse the repository at this point in the history
  • Loading branch information
danxmoran committed Mar 11, 2021
1 parent 69c1df7 commit eff0ad5
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 77 deletions.
78 changes: 1 addition & 77 deletions kv/migration/all/0003_task_owners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,15 @@ import (
"fmt"
"testing"

"github.com/benbjohnson/clock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorization"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/tenant"
"go.uber.org/zap/zaptest"
)

func Test_(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

ts := newService(t, ctx)
ts := newService(t, ctx, 2)

taskBucket := []byte("tasksv1")
id := "05da585043e02000"
Expand Down Expand Up @@ -86,73 +80,3 @@ func Test_(t *testing.T) {
t.Fatal("failed to fill in ownerID")
}
}

type testService struct {
Store kv.SchemaStore
Service *kv.Service
Org influxdb.Organization
User influxdb.User
Auth influxdb.Authorization
Clock clock.Clock
}

func newService(t *testing.T, ctx context.Context) *testService {
t.Helper()

var (
ts = &testService{
Store: inmem.NewKVStore(),
}
logger = zaptest.NewLogger(t)
)

// apply migrations up to (but not including) this one
migrator, err := migration.NewMigrator(logger, ts.Store, Migrations[:2]...)
if err != nil {
t.Fatal(err)
}

if err := migrator.Up(ctx); err != nil {
t.Fatal(err)
}

store := tenant.NewStore(ts.Store)
tenantSvc := tenant.NewService(store)

authStore, err := authorization.NewStore(ts.Store)
if err != nil {
t.Fatal(err)
}
authSvc := authorization.NewService(authStore, tenantSvc)

ts.Service = kv.NewService(logger, ts.Store, tenantSvc)

ts.User = influxdb.User{Name: t.Name() + "-user"}
if err := tenantSvc.CreateUser(ctx, &ts.User); err != nil {
t.Fatal(err)
}
ts.Org = influxdb.Organization{Name: t.Name() + "-org"}
if err := tenantSvc.CreateOrganization(ctx, &ts.Org); err != nil {
t.Fatal(err)
}

if err := tenantSvc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
ResourceType: influxdb.OrgsResourceType,
ResourceID: ts.Org.ID,
UserID: ts.User.ID,
UserType: influxdb.Owner,
}); err != nil {
t.Fatal(err)
}

ts.Auth = influxdb.Authorization{
OrgID: ts.Org.ID,
UserID: ts.User.ID,
Permissions: influxdb.OperPermissions(),
}
if err := authSvc.CreateAuthorization(context.Background(), &ts.Auth); err != nil {
t.Fatal(err)
}

return ts
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package all

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/stretchr/testify/require"
)

func TestMigration_ShardGroupDuration(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

// Run up to migration 14.
ts := newService(t, ctx, 14)

// Seed some buckets.
buckets := []*influxdb.Bucket{
{
ID: influxdb.ID(1),
Name: "infinite",
OrgID: ts.Org.ID,
RetentionPeriod: 0,
},
{
ID: influxdb.ID(2),
Name: "1w",
OrgID: ts.Org.ID,
RetentionPeriod: humanize.Week,
},
{
ID: influxdb.ID(3),
Name: "1d",
OrgID: ts.Org.ID,
RetentionPeriod: humanize.Day,
},
{
ID: influxdb.ID(4),
Name: "1h",
OrgID: ts.Org.ID,
RetentionPeriod: time.Hour,
},
}

bucketBucket := []byte("bucketsv1")
ids := make([][]byte, len(buckets))
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
require.NoError(t, err)
for i, b := range buckets {
js, err := json.Marshal(b)
require.NoError(t, err)

ids[i], err = b.ID.Encode()
require.NoError(t, err)
require.NoError(t, bkt.Put(ids[i], js))
}
return nil
})
require.NoError(t, err)

// Run the migration.
require.NoError(t, Migration0015_RecordShardGroupDurationsInBucketMetadata.Up(context.Background(), ts.Store))

// Read the buckets back out of the store.
migratedBuckets := make([]influxdb.Bucket, len(buckets))
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
require.NoError(t, err)

rawBuckets, err := bkt.GetBatch(ids...)
require.NoError(t, err)

for i, rawBucket := range rawBuckets {
require.NoError(t, json.Unmarshal(rawBucket, &migratedBuckets[i]))
}

return nil
})
require.NoError(t, err)

// Check that normalized shard-group durations were backfilled.
require.Equal(t, humanize.Week, migratedBuckets[0].ShardGroupDuration)
require.Equal(t, humanize.Day, migratedBuckets[1].ShardGroupDuration)
require.Equal(t, time.Hour, migratedBuckets[2].ShardGroupDuration)
require.Equal(t, time.Hour, migratedBuckets[3].ShardGroupDuration)
}
85 changes: 85 additions & 0 deletions kv/migration/all/test_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package all

import (
"context"
"testing"

"github.com/benbjohnson/clock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorization"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/tenant"
"go.uber.org/zap/zaptest"
)

type testService struct {
Store kv.SchemaStore
Service *kv.Service
Org influxdb.Organization
User influxdb.User
Auth influxdb.Authorization
Clock clock.Clock
}

func newService(t *testing.T, ctx context.Context, endMigration int) *testService {
t.Helper()

var (
ts = &testService{
Store: inmem.NewKVStore(),
}
logger = zaptest.NewLogger(t)
)

// apply migrations up to (but not including) this one
migrator, err := migration.NewMigrator(logger, ts.Store, Migrations[:endMigration]...)
if err != nil {
t.Fatal(err)
}

if err := migrator.Up(ctx); err != nil {
t.Fatal(err)
}

store := tenant.NewStore(ts.Store)
tenantSvc := tenant.NewService(store)

authStore, err := authorization.NewStore(ts.Store)
if err != nil {
t.Fatal(err)
}
authSvc := authorization.NewService(authStore, tenantSvc)

ts.Service = kv.NewService(logger, ts.Store, tenantSvc)

ts.User = influxdb.User{Name: t.Name() + "-user"}
if err := tenantSvc.CreateUser(ctx, &ts.User); err != nil {
t.Fatal(err)
}
ts.Org = influxdb.Organization{Name: t.Name() + "-org"}
if err := tenantSvc.CreateOrganization(ctx, &ts.Org); err != nil {
t.Fatal(err)
}

if err := tenantSvc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
ResourceType: influxdb.OrgsResourceType,
ResourceID: ts.Org.ID,
UserID: ts.User.ID,
UserType: influxdb.Owner,
}); err != nil {
t.Fatal(err)
}

ts.Auth = influxdb.Authorization{
OrgID: ts.Org.ID,
UserID: ts.User.ID,
Permissions: influxdb.OperPermissions(),
}
if err := authSvc.CreateAuthorization(context.Background(), &ts.Auth); err != nil {
t.Fatal(err)
}

return ts
}

0 comments on commit eff0ad5

Please sign in to comment.