diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 8b9819382ae..a705a0afd4e 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/cmd/influxd/launcher" "github.com/influxdata/influxdb/v2/cmd/influxd/upgrade" - _ "github.com/influxdata/influxdb/v2/query/builtin" _ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" _ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/spf13/cobra" @@ -33,7 +32,6 @@ func main() { // FIXME //generate.Command, //restore.Command, - upgrade.Command, &cobra.Command{ Use: "version", Short: "Print the influxd server version", @@ -43,6 +41,9 @@ func main() { }, ) + // upgrade binds options to env variables, so it must be added after rootCmd is initialized + rootCmd.AddCommand(upgrade.NewCommand()) + if err := rootCmd.Execute(); err != nil { os.Exit(1) } diff --git a/cmd/influxd/upgrade/database.go b/cmd/influxd/upgrade/database.go new file mode 100644 index 00000000000..d641b0adeb8 --- /dev/null +++ b/cmd/influxd/upgrade/database.go @@ -0,0 +1,17 @@ +package upgrade + +import ( + "context" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/errors" + "go.uber.org/zap" +) + +func setupAdmin(ctx context.Context, v2 *influxDBv2) (*influxdb.OnboardingResults, error) { + return nil, errors.New("not implemented") +} + +// upgradeDatabases creates databases, buckets, retention policies and shard info according to 1.x meta and copies data +func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, orgID influxdb.ID, log *zap.Logger) (map[string][]string, error) { + return nil, errors.New("not implemented") +} diff --git a/cmd/influxd/upgrade/generate_security_script.go b/cmd/influxd/upgrade/generate_security_script.go new file mode 100644 index 00000000000..56653b18454 --- /dev/null +++ b/cmd/influxd/upgrade/generate_security_script.go @@ -0,0 +1,11 @@ +package upgrade + +import ( + "errors" + "go.uber.org/zap" +) + +// Generates security upgrade script. +func generateSecurityScript(v1 *influxDBv1, dbBuckets map[string][]string, log *zap.Logger) error { + return errors.New("not implemented") +} diff --git a/cmd/influxd/upgrade/upgrade.go b/cmd/influxd/upgrade/upgrade.go index 5fe318cfcd8..2e38bbf3e4a 100644 --- a/cmd/influxd/upgrade/upgrade.go +++ b/cmd/influxd/upgrade/upgrade.go @@ -2,21 +2,25 @@ package upgrade import ( "context" + "errors" "fmt" + "io/ioutil" "os" "os/user" "path/filepath" + "runtime" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/authorizer" "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/dbrp" "github.com/influxdata/influxdb/v2/internal/fs" + "github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/kit/metric" "github.com/influxdata/influxdb/v2/kit/prom" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/kv/migration" "github.com/influxdata/influxdb/v2/kv/migration/all" + "github.com/influxdata/influxdb/v2/storage" "github.com/influxdata/influxdb/v2/tenant" "github.com/influxdata/influxdb/v2/v1/services/meta" "github.com/influxdata/influxdb/v2/v1/services/meta/filestore" @@ -24,18 +28,49 @@ import ( "go.uber.org/zap" ) -var Command = &cobra.Command{ - Use: "upgrade", - Short: "Upgrade a 1.x version of InfluxDB", - RunE: runUpgradeE, +// Simplified 1.x config. +type configV1 struct { + Meta struct { + Dir string `toml:"dir"` + } `toml:"meta"` + Data struct { + Dir string `toml:"dir"` + WALDir string `toml:"wal-dir"` + } `toml:"data"` } type optionsV1 struct { metaDir string + walDir string + dataDir string + // cmd option + dbDir string + configFile string +} + +func (o *optionsV1) checkDirs() error { + if o.metaDir == "" || o.dataDir == "" || o.walDir == "" { + if o.dbDir == "" { + return errors.New("source directory not specified") + } else { + o.metaDir = filepath.Join(o.dbDir, "meta") + o.dataDir = filepath.Join(o.dbDir, "data") + o.walDir = filepath.Join(o.dbDir, "wal") + } + } + return nil } type optionsV2 struct { - boltPath string + boltPath string + enginePath string + userName string + password string + orgName string + bucket string + token string + retention string + securityScriptPath string } var options = struct { @@ -44,10 +79,14 @@ var options = struct { // flags for target InfluxDB target optionsV2 + + // verbose output + verbose bool + + logPath string }{} -func init() { - flags := Command.Flags() +func NewCommand() *cobra.Command { // source flags v1dir, err := influxDirV1() @@ -55,19 +94,134 @@ func init() { panic("error fetching default InfluxDB 1.x dir: " + err.Error()) } - flags.StringVar(&options.source.metaDir, "v1-meta-dir", filepath.Join(v1dir, "meta"), "Path to 1.x meta.db directory") - // target flags v2dir, err := fs.InfluxDir() if err != nil { panic("error fetching default InfluxDB 2.0 dir: " + err.Error()) } - flags.StringVar(&options.target.boltPath, "v2-bolt-path", filepath.Join(v2dir, "influxd.bolt"), "Path to 2.0 metadata") + // os-specific + var defaultSsPath string + if runtime.GOOS == "windows" { + defaultSsPath = filepath.Join(homeOrAnyDir(), "influxd-upgrade-security.cmd") + } else { + defaultSsPath = filepath.Join(homeOrAnyDir(), "influxd-upgrade-security.sh") + } + + cmd := &cobra.Command{ + Use: "upgrade", + Short: "Upgrade a 1.x version of InfluxDB", + Long: ` + Upgrades a 1.x version of InfluxDB by performing following actions: + 1. Reads 1.x config file and creates 2.x config file with matching options. Unsupported 1.x options are reported. + 2. Copies and upgrades 1.x database files. + 3. Creates a script for creating 1.x users and their permissions. This scripts needs to be revised and run manually after starting 2.x. + + If config file is not available, 1.x db folder (--v1-dir options) is taken as an input. + Target 2.x database dir is specified by the --engine-path option. If changed, bolt path should to be changed as well. +`, + RunE: runUpgradeE, + } + opts := []cli.Opt{ + { + DestP: &options.source.dbDir, + Flag: "v1-dir", + Default: v1dir, + Desc: "path to source 1.x db directory containing meta,data and wal sub-folders", + }, + { + DestP: &options.verbose, + Flag: "verbose", + Default: true, + Desc: "verbose output", + Short: 'v', + }, + { + DestP: &options.target.boltPath, + Flag: "bolt-path", + Default: filepath.Join(v2dir, bolt.DefaultFilename), + Desc: "path for boltdb database", + Short: 'm', + }, + { + DestP: &options.target.enginePath, + Flag: "engine-path", + Default: filepath.Join(v2dir, "engine"), + Desc: "path for persistent engine files", + Short: 'e', + }, + { + DestP: &options.target.userName, + Flag: "username", + Default: "", + Desc: "primary username", + Short: 'u', + Required: true, + }, + { + DestP: &options.target.password, + Flag: "password", + Default: "", + Desc: "password for username", + Short: 'p', + Required: true, + }, + { + DestP: &options.target.orgName, + Flag: "org", + Default: "", + Desc: "primary organization name", + Short: 'o', + Required: true, + }, + { + DestP: &options.target.bucket, + Flag: "bucket", + Default: "", + Desc: "primary bucket name", + Short: 'b', + Required: true, + }, + { + DestP: &options.target.retention, + Flag: "retention", + Default: "", + Desc: "optional: duration bucket will retain data. 0 is infinite. Default is 0.", + Short: 'r', + }, + { + DestP: &options.target.token, + Flag: "token", + Default: "", + Desc: "optional: token for username, else auto-generated", + Short: 't', + }, + { + DestP: &options.source.configFile, + Flag: "config-file", + Default: "/etc/influxdb/influxdb.conf", + Desc: "optional: Custom InfluxDB 1.x config file path, else the default config file", + }, + { + DestP: &options.target.securityScriptPath, + Flag: "security-script", + Default: defaultSsPath, + Desc: "optional: generated security upgrade script path", + }, + { + DestP: &options.logPath, + Flag: "log-path", + Default: filepath.Join(homeOrAnyDir(), "upgrade.log"), + Desc: "optional: custom log file path", + }, + } + + cli.BindOptions(cmd, opts) // add sub commands - Command.AddCommand(v1DumpMetaCommand) - Command.AddCommand(v2DumpMetaCommand) + cmd.AddCommand(v1DumpMetaCommand) + cmd.AddCommand(v2DumpMetaCommand) + return cmd } type influxDBv1 struct { @@ -75,41 +229,143 @@ type influxDBv1 struct { } type influxDBv2 struct { + log *zap.Logger boltClient *bolt.Client store *bolt.KVStore kvStore kv.SchemaStore tenantStore *tenant.Store ts *tenant.Service dbrpSvc influxdb.DBRPMappingServiceV2 + bucketSvc influxdb.BucketService onboardSvc influxdb.OnboardingService kvService *kv.Service meta *meta.Client } -func runUpgradeE(cmd *cobra.Command, args []string) error { +func (i *influxDBv2) close() error { + err := i.meta.Close() + if err != nil { + return err + } + err = i.boltClient.Close() + if err != nil { + return err + } + err = i.store.Close() + if err != nil { + return err + } + return nil +} + +func runUpgradeE(*cobra.Command, []string) error { ctx := context.Background() + config := zap.NewProductionConfig() + config.OutputPaths = append(config.OutputPaths, options.logPath) + config.ErrorOutputPaths = append(config.ErrorOutputPaths, options.logPath) + log, err := config.Build() + if err != nil { + return err + } + log.Info("starting InfluxDB 1.x upgrade") + + checkParam := func(name, value string) error { + if value == "" { + return fmt.Errorf("empty or missing mandatory option %s", name) + } + return nil + } + + if err := checkParam("username", options.target.userName); err != nil { + return err + } + if err := checkParam("password", options.target.password); err != nil { + return err + } + if err := checkParam("org", options.target.orgName); err != nil { + return err + } + if err := checkParam("bucket", options.target.bucket); err != nil { + return err + } + + if options.source.configFile != "" { + log.Info("upgrading config file", zap.String("file", options.source.configFile)) + if _, err := os.Stat(options.source.configFile); err != nil { + return err + } + v1Config, err := upgradeConfig(options.source.configFile, options.target, log) + if err != nil { + return err + } + options.source.metaDir = v1Config.Meta.Dir + options.source.dataDir = v1Config.Data.Dir + options.source.walDir = v1Config.Data.WALDir + } else { + log.Info("no InfluxDB 1.x config file specified, skipping its upgrade") + } + + if err := options.source.checkDirs(); err != nil { + return err + } + + log.Info("source paths", zap.String("meta", options.source.metaDir), zap.String("data", options.source.dataDir)) + log.Info("target paths", zap.String("bolt", options.target.boltPath), zap.String("engine", options.target.enginePath)) + + if fi, err := os.Stat(options.target.enginePath); err == nil { + if !fi.IsDir() { + return fmt.Errorf("engine path '%s' is not directory", options.target.enginePath) + } + entries, err := ioutil.ReadDir(options.target.enginePath) + if err != nil { + return err + } + if len(entries) > 0 { + return fmt.Errorf("target engine path '%s' must be empty", options.target.enginePath) + } + } v1, err := newInfluxDBv1(&options.source) if err != nil { return err } - _ = v1 - v2, err := newInfluxDBv2(ctx, &options.target) + v2, err := newInfluxDBv2(ctx, &options.target, log) + if err != nil { + return err + } + + defer func() { + if err := v2.close(); err != nil { + log.Error("2.x services closing error", zap.Error(err)) + } + }() + + or, err := setupAdmin(ctx, v2) + if err != nil { + return err + } + options.target.token = or.Auth.Token + + db2BucketIds, err := upgradeDatabases(ctx, v1, v2, or.Org.ID, log) if err != nil { + //remove all files + log.Info("database upgrade error, removing data") + if e := os.Remove(options.target.boltPath); e != nil { + log.Error("cleaning failed", zap.Error(e)) + } + + if e := os.RemoveAll(options.target.enginePath); e != nil { + log.Error("cleaning failed", zap.Error(e)) + } return err } - _ = v2 - // 1. Onboard the initial admin user - // v2.onboardSvc.OnboardInitialUser() + if err = generateSecurityScript(v1, db2BucketIds, log); err != nil { + return err + } - // 2. read each database / retention policy from v1.meta and create bucket db-name/rp-name - // newBucket := v2.ts.CreateBucket(ctx, Bucket{}) - // - // 3. create database in v2.meta - // v2.meta.CreateDatabase(newBucket.ID.String()) - // copy shard info from v1.meta + log.Info("upgrade successfully completed. Start service now") return nil } @@ -124,11 +380,11 @@ func newInfluxDBv1(opts *optionsV1) (svc *influxDBv1, err error) { return svc, nil } -func newInfluxDBv2(ctx context.Context, opts *optionsV2) (svc *influxDBv2, err error) { - log := zap.NewNop() +func newInfluxDBv2(ctx context.Context, opts *optionsV2, log *zap.Logger) (svc *influxDBv2, err error) { reg := prom.NewRegistry(log.With(zap.String("service", "prom_registry"))) svc = &influxDBv2{} + svc.log = log // ********************* // V2 specific services @@ -179,8 +435,16 @@ func newInfluxDBv2(ctx context.Context, opts *optionsV2) (svc *influxDBv2, err e } // DB/RP service - svc.dbrpSvc = dbrp.NewService(ctx, authorizer.NewBucketService(svc.ts.BucketService), svc.kvStore) + svc.dbrpSvc = dbrp.NewService(ctx, svc.ts.BucketService, svc.kvStore) + svc.bucketSvc = svc.ts.BucketService + engine := storage.NewEngine( + opts.enginePath, + storage.NewConfig(), + storage.WithMetaClient(svc.meta), + ) + + svc.ts.BucketService = storage.NewBucketService(svc.ts.BucketService, engine) // on-boarding service (influx setup) svc.onboardSvc = tenant.NewOnboardService(svc.ts, authSvc) @@ -219,3 +483,25 @@ func influxDirV1() (string, error) { return dir, nil } + +// homeOrAnyDir retrieves user's home directory, current working one or just none. +func homeOrAnyDir() string { + var dir string + u, err := user.Current() + if err == nil { + dir = u.HomeDir + } else if home := os.Getenv("HOME"); home != "" { + dir = home + } else if home := os.Getenv("USERPROFILE"); home != "" { + dir = home + } else { + wd, err := os.Getwd() + if err != nil { + dir = "" + } else { + dir = wd + } + } + + return dir +} diff --git a/cmd/influxd/upgrade/upgrade_config.go b/cmd/influxd/upgrade/upgrade_config.go new file mode 100644 index 00000000000..86935b8a9b9 --- /dev/null +++ b/cmd/influxd/upgrade/upgrade_config.go @@ -0,0 +1,12 @@ +package upgrade + +import ( + "errors" + + "go.uber.org/zap" +) + +// Backups existing config file and updates it with upgraded config. +func upgradeConfig(configFile string, targetOptions optionsV2, log *zap.Logger) (*configV1, error) { + return nil, errors.New("not implemented") +} diff --git a/cmd/influxd/upgrade/v1_dump_meta.go b/cmd/influxd/upgrade/v1_dump_meta.go index 172db1cb87d..06f5f5de896 100644 --- a/cmd/influxd/upgrade/v1_dump_meta.go +++ b/cmd/influxd/upgrade/v1_dump_meta.go @@ -30,9 +30,47 @@ var v1DumpMetaCommand = &cobra.Command{ fmt.Fprintln(os.Stdout, "Databases") fmt.Fprintln(os.Stdout, "---------") - fmt.Fprintf(tw, "%s\t%s\n", "Name", "Default RP") + fmt.Fprintf(tw, "%s\t%s\t%s\n", "Name", "Default RP", "Shards") for _, row := range meta.Databases() { - fmt.Fprintf(tw, "%s\t%s\n", row.Name, row.DefaultRetentionPolicy) + fmt.Fprintf(tw, "%s\t%s\t", row.Name, row.DefaultRetentionPolicy) + for i, si := range row.ShardInfos() { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Retention policies") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database", "Name", "Duration", "Shard Group duration") + for _, db := range meta.Databases() { + for _, rp := range db.RetentionPolicies { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", db.Name, rp.Name, rp.Duration.String(), rp.ShardGroupDuration.String()) + } + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Shard groups") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database/RP", "Start Time", "End Time", "Shards") + for _, db := range meta.Databases() { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + fmt.Fprintf(tw, "%s/%s\t%s\t%s\t", db.Name, rp.Name, sg.StartTime.String(), sg.EndTime.String()) + for i, si := range sg.Shards { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + } } _ = tw.Flush() fmt.Fprintln(os.Stdout) @@ -44,7 +82,7 @@ var v1DumpMetaCommand = &cobra.Command{ fmt.Fprintf(tw, "%s\t%s\n", row.Name, showBool(row.Admin)) } _ = tw.Flush() - + fmt.Fprintln(os.Stdout) return nil }, } diff --git a/cmd/influxd/upgrade/v2_dump_meta.go b/cmd/influxd/upgrade/v2_dump_meta.go index 1b3d9d74968..437c0b48b13 100644 --- a/cmd/influxd/upgrade/v2_dump_meta.go +++ b/cmd/influxd/upgrade/v2_dump_meta.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/internal/fs" "github.com/spf13/cobra" + "go.uber.org/zap" ) var v2DumpMetaCommand = &cobra.Command{ @@ -17,7 +18,7 @@ var v2DumpMetaCommand = &cobra.Command{ Short: "Dump InfluxDB 2.x influxd.bolt", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - svc, err := newInfluxDBv2(ctx, &v2DumpMetaOptions) + svc, err := newInfluxDBv2(ctx, &v2DumpMetaOptions, zap.NewNop()) if err != nil { return fmt.Errorf("error opening InfluxDB 2.0: %w", err) } @@ -63,6 +64,88 @@ var v2DumpMetaCommand = &cobra.Command{ _ = tw.Flush() fmt.Fprintln(os.Stdout) + fmt.Fprintln(os.Stdout, "Databases") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\n", "Name", "Default RP", "Shards") + for _, row := range svc.meta.Databases() { + fmt.Fprintf(tw, "%s\t%s\t", row.Name, row.DefaultRetentionPolicy) + for i, si := range row.ShardInfos() { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Retention policies") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database", "Name", "Duration", "Shard Group duration") + for _, db := range svc.meta.Databases() { + for _, rp := range db.RetentionPolicies { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", db.Name, rp.Name, rp.Duration.String(), rp.ShardGroupDuration.String()) + } + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Shard groups") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database/RP", "Start Time", "End Time", "Shards") + for _, db := range svc.meta.Databases() { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + fmt.Fprintf(tw, "%s/%s\t%s\t%s\t", db.Name, rp.Name, sg.StartTime.String(), sg.EndTime.String()) + for i, si := range sg.Shards { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + } + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Mappings") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", "Database", "RP", "Org", "Bucket", "Default") + mappings, _, err := svc.dbrpSvc.FindMany(ctx, influxdb.DBRPMappingFilterV2{}) + if err != nil { + return err + } + showBool := func(b bool) string { + if b { + return "yes" + } + return "no" + } + for _, row := range mappings { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", row.Database, row.RetentionPolicy, row.OrganizationID.String(), row.BucketID.String(), showBool(row.Default)) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + showCheck := func(b bool) string { + if b { + return "✓" + } + return "" + } + + fmt.Fprintln(os.Stdout, "Users") + fmt.Fprintln(os.Stdout, "-----") + fmt.Fprintf(tw, "%s\t%s\n", "Name", "Admin") + for _, row := range svc.meta.Users() { + fmt.Fprintf(tw, "%s\t%s\n", row.Name, showCheck(row.Admin)) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + return nil }, }