-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: add test for shard-group duration migration
- Loading branch information
Showing
3 changed files
with
178 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
kv/migration/all/0015_record-shard-group-durations-in-bucket-metadata_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package all | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
"time" | ||
|
||
"github.com/dustin/go-humanize" | ||
"github.com/influxdata/influxdb/v2" | ||
"github.com/influxdata/influxdb/v2/kv" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestMigration_ShardGroupDuration(t *testing.T) { | ||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||
defer cancelFunc() | ||
|
||
// Run up to migration 14. | ||
ts := newService(t, ctx, 14) | ||
|
||
// Seed some buckets. | ||
buckets := []*influxdb.Bucket{ | ||
{ | ||
ID: influxdb.ID(1), | ||
Name: "infinite", | ||
OrgID: ts.Org.ID, | ||
RetentionPeriod: 0, | ||
}, | ||
{ | ||
ID: influxdb.ID(2), | ||
Name: "1w", | ||
OrgID: ts.Org.ID, | ||
RetentionPeriod: humanize.Week, | ||
}, | ||
{ | ||
ID: influxdb.ID(3), | ||
Name: "1d", | ||
OrgID: ts.Org.ID, | ||
RetentionPeriod: humanize.Day, | ||
}, | ||
{ | ||
ID: influxdb.ID(4), | ||
Name: "1h", | ||
OrgID: ts.Org.ID, | ||
RetentionPeriod: time.Hour, | ||
}, | ||
} | ||
|
||
bucketBucket := []byte("bucketsv1") | ||
ids := make([][]byte, len(buckets)) | ||
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error { | ||
bkt, err := tx.Bucket(bucketBucket) | ||
require.NoError(t, err) | ||
for i, b := range buckets { | ||
js, err := json.Marshal(b) | ||
require.NoError(t, err) | ||
|
||
ids[i], err = b.ID.Encode() | ||
require.NoError(t, err) | ||
require.NoError(t, bkt.Put(ids[i], js)) | ||
} | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Run the migration. | ||
require.NoError(t, Migration0015_RecordShardGroupDurationsInBucketMetadata.Up(context.Background(), ts.Store)) | ||
|
||
// Read the buckets back out of the store. | ||
migratedBuckets := make([]influxdb.Bucket, len(buckets)) | ||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error { | ||
bkt, err := tx.Bucket(bucketBucket) | ||
require.NoError(t, err) | ||
|
||
rawBuckets, err := bkt.GetBatch(ids...) | ||
require.NoError(t, err) | ||
|
||
for i, rawBucket := range rawBuckets { | ||
require.NoError(t, json.Unmarshal(rawBucket, &migratedBuckets[i])) | ||
} | ||
|
||
return nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Check that normalized shard-group durations were backfilled. | ||
require.Equal(t, humanize.Week, migratedBuckets[0].ShardGroupDuration) | ||
require.Equal(t, humanize.Day, migratedBuckets[1].ShardGroupDuration) | ||
require.Equal(t, time.Hour, migratedBuckets[2].ShardGroupDuration) | ||
require.Equal(t, time.Hour, migratedBuckets[3].ShardGroupDuration) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package all | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/influxdata/influxdb/v2" | ||
"github.com/influxdata/influxdb/v2/authorization" | ||
"github.com/influxdata/influxdb/v2/inmem" | ||
"github.com/influxdata/influxdb/v2/kv" | ||
"github.com/influxdata/influxdb/v2/kv/migration" | ||
"github.com/influxdata/influxdb/v2/tenant" | ||
"go.uber.org/zap/zaptest" | ||
) | ||
|
||
type testService struct { | ||
Store kv.SchemaStore | ||
Service *kv.Service | ||
Org influxdb.Organization | ||
User influxdb.User | ||
Auth influxdb.Authorization | ||
Clock clock.Clock | ||
} | ||
|
||
func newService(t *testing.T, ctx context.Context, endMigration int) *testService { | ||
t.Helper() | ||
|
||
var ( | ||
ts = &testService{ | ||
Store: inmem.NewKVStore(), | ||
} | ||
logger = zaptest.NewLogger(t) | ||
) | ||
|
||
// apply migrations up to (but not including) this one | ||
migrator, err := migration.NewMigrator(logger, ts.Store, Migrations[:endMigration]...) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if err := migrator.Up(ctx); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
store := tenant.NewStore(ts.Store) | ||
tenantSvc := tenant.NewService(store) | ||
|
||
authStore, err := authorization.NewStore(ts.Store) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
authSvc := authorization.NewService(authStore, tenantSvc) | ||
|
||
ts.Service = kv.NewService(logger, ts.Store, tenantSvc) | ||
|
||
ts.User = influxdb.User{Name: t.Name() + "-user"} | ||
if err := tenantSvc.CreateUser(ctx, &ts.User); err != nil { | ||
t.Fatal(err) | ||
} | ||
ts.Org = influxdb.Organization{Name: t.Name() + "-org"} | ||
if err := tenantSvc.CreateOrganization(ctx, &ts.Org); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if err := tenantSvc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{ | ||
ResourceType: influxdb.OrgsResourceType, | ||
ResourceID: ts.Org.ID, | ||
UserID: ts.User.ID, | ||
UserType: influxdb.Owner, | ||
}); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
ts.Auth = influxdb.Authorization{ | ||
OrgID: ts.Org.ID, | ||
UserID: ts.User.ID, | ||
Permissions: influxdb.OperPermissions(), | ||
} | ||
if err := authSvc.CreateAuthorization(context.Background(), &ts.Auth); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
return ts | ||
} |