diff --git a/cmd/influx/bucket.go b/cmd/influx/bucket.go index ff4739ef93c..67df9f33927 100644 --- a/cmd/influx/bucket.go +++ b/cmd/influx/bucket.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/internal" "github.com/influxdata/influxdb/v2/http" "github.com/spf13/cobra" ) @@ -88,7 +89,7 @@ func (b *cmdBucketBuilder) cmdCreateRunEFn(*cobra.Command, []string) error { return err } - dur, err := rawDurationToTimeDuration(b.retention) + dur, err := internal.RawDurationToTimeDuration(b.retention) if err != nil { return err } @@ -273,7 +274,7 @@ func (b *cmdBucketBuilder) cmdUpdateRunEFn(cmd *cobra.Command, args []string) er update.Description = &b.description } - dur, err := rawDurationToTimeDuration(b.retention) + dur, err := internal.RawDurationToTimeDuration(b.retention) if err != nil { return err } diff --git a/cmd/influx/main.go b/cmd/influx/main.go index d05b161a3f5..4c88b427ed4 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -21,7 +20,6 @@ import ( "github.com/influxdata/influxdb/v2/internal/fs" "github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/pkg/httpc" - "github.com/influxdata/influxdb/v2/task/options" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -614,50 +612,3 @@ func newBucketService() (influxdb.BucketService, error) { Client: client, }, nil } - -func rawDurationToTimeDuration(raw string) (time.Duration, error) { - if raw == "" { - return 0, nil - } - - if dur, err := time.ParseDuration(raw); err == nil { - return dur, nil - } - - retention, err := options.ParseSignedDuration(raw) - if err != nil { - return 0, err - } - - const ( - day = 24 * time.Hour - week = 7 * day - ) - - var dur time.Duration - for _, d := range retention.Values { - if d.Magnitude < 0 { - return 0, errors.New("must be greater than 0") - } - mag := time.Duration(d.Magnitude) - switch d.Unit { - case "w": - dur += mag * week - case "d": - dur += mag * day - case "m": - dur += mag * time.Minute - case "s": - dur += mag * time.Second - case "ms": - dur += mag * time.Minute - case "us": - dur += mag * time.Microsecond - case "ns": - dur += mag * time.Nanosecond - default: - return 0, errors.New("duration must be week(w), day(d), hour(h), min(m), sec(s), millisec(ms), microsec(us), or nanosec(ns)") - } - } - return dur, nil -} diff --git a/cmd/influx/secret.go b/cmd/influx/secret.go index 18ad303813d..06c6d82360a 100644 --- a/cmd/influx/secret.go +++ b/cmd/influx/secret.go @@ -5,9 +5,10 @@ import ( "fmt" "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/internal" "github.com/influxdata/influxdb/v2/http" "github.com/spf13/cobra" - input "github.com/tcnksm/go-input" + "github.com/tcnksm/go-input" ) type secretSVCsFn func() (influxdb.SecretService, influxdb.OrganizationService, func(*input.UI) string, error) @@ -245,5 +246,5 @@ func newSecretSVCs() (influxdb.SecretService, influxdb.OrganizationService, func } orgSvc := &http.OrganizationService{Client: httpClient} - return &http.SecretService{Client: httpClient}, orgSvc, getSecret, nil + return &http.SecretService{Client: httpClient}, orgSvc, internal.GetSecret, nil } diff --git a/cmd/influx/setup.go b/cmd/influx/setup.go index 067cb511416..c8c1e0c8a13 100644 --- a/cmd/influx/setup.go +++ b/cmd/influx/setup.go @@ -7,15 +7,15 @@ import ( "os" "path/filepath" "strconv" - "strings" "time" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/cmd/influx/config" "github.com/influxdata/influxdb/v2/cmd/influx/internal" + internal2 "github.com/influxdata/influxdb/v2/cmd/internal" "github.com/influxdata/influxdb/v2/tenant" "github.com/spf13/cobra" - input "github.com/tcnksm/go-input" + "github.com/tcnksm/go-input" ) var setupFlags struct { @@ -31,8 +31,6 @@ var setupFlags struct { username string } -const minPasswordLen int = 8 - func cmdSetup(f *globalFlags, opt genericCLIOpts) *cobra.Command { cmd := opt.newCmd("setup", nil, true) cmd.RunE = setupF @@ -182,7 +180,7 @@ func setupF(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to write config to path %q: %v", dPath, err) } - fmt.Println(string(promptWithColor(fmt.Sprintf("Config %s has been stored in %s.", p.Name, dPath), colorCyan))) + fmt.Println(string(internal2.PromptWithColor(fmt.Sprintf("Config %s has been stored in %s.", p.Name, dPath), internal2.ColorCyan))) w := cmd.OutOrStdout() if setupFlags.json { @@ -224,8 +222,8 @@ func onboardingRequest() (*influxdb.OnboardingRequest, error) { } func nonInteractive() (*influxdb.OnboardingRequest, error) { - if len(setupFlags.password) < minPasswordLen { - return nil, errPasswordIsTooShort + if len(setupFlags.password) < internal2.MinPasswordLen { + return nil, internal2.ErrPasswordIsTooShort } req := &influxdb.OnboardingRequest{ @@ -237,7 +235,7 @@ func nonInteractive() (*influxdb.OnboardingRequest, error) { RetentionPeriod: influxdb.InfiniteRetention, } - dur, err := rawDurationToTimeDuration(setupFlags.retention) + dur, err := internal2.RawDurationToTimeDuration(setupFlags.retention) if err != nil { return nil, err } @@ -253,16 +251,16 @@ func interactive() (req *influxdb.OnboardingRequest, err error) { Reader: os.Stdin, } req = new(influxdb.OnboardingRequest) - fmt.Println(string(promptWithColor(`Welcome to InfluxDB 2.0!`, colorYellow))) + fmt.Println(string(internal2.PromptWithColor(`Welcome to InfluxDB 2.0!`, internal2.ColorYellow))) if setupFlags.username != "" { req.User = setupFlags.username } else { - req.User = getInput(ui, "Please type your primary username", "") + req.User = internal2.GetInput(ui, "Please type your primary username", "") } - if setupFlags.password != "" && len(setupFlags.password) >= minPasswordLen { + if setupFlags.password != "" && len(setupFlags.password) >= internal2.MinPasswordLen { req.Password = setupFlags.password } else { - req.Password = getPassword(ui, false) + req.Password = internal2.GetPassword(ui, false) } if setupFlags.token != "" { req.Token = setupFlags.token @@ -271,15 +269,15 @@ func interactive() (req *influxdb.OnboardingRequest, err error) { if setupFlags.org != "" { req.Org = setupFlags.org } else { - req.Org = getInput(ui, "Please type your primary organization name", "") + req.Org = internal2.GetInput(ui, "Please type your primary organization name", "") } if setupFlags.bucket != "" { req.Bucket = setupFlags.bucket } else { - req.Bucket = getInput(ui, "Please type your primary bucket name", "") + req.Bucket = internal2.GetInput(ui, "Please type your primary bucket name", "") } - dur, err := rawDurationToTimeDuration(setupFlags.retention) + dur, err := internal2.RawDurationToTimeDuration(setupFlags.retention) if err != nil { return nil, err } @@ -288,177 +286,32 @@ func interactive() (req *influxdb.OnboardingRequest, err error) { req.RetentionPeriod = uint(dur / time.Hour) } else { for { - rpStr := getInput(ui, "Please type your retention period in hours.\r\nOr press ENTER for infinite.", strconv.Itoa(influxdb.InfiniteRetention)) + rpStr := internal2.GetInput(ui, "Please type your retention period in hours.\r\nOr press ENTER for infinite.", strconv.Itoa(influxdb.InfiniteRetention)) rp, err := strconv.Atoi(rpStr) if rp >= 0 && err == nil { - req.RetentionPeriod = uint(rp) + req.RetentionPeriod = uint(rp) * uint(time.Hour) break } } } if !setupFlags.force { - if confirmed := getConfirm(ui, req); !confirmed { - return nil, fmt.Errorf("setup was canceled") - } - } - - return req, nil -} - -// vt100EscapeCodes -var ( - keyEscape = byte(27) - colorRed = []byte{keyEscape, '[', '3', '1', 'm'} - colorYellow = []byte{keyEscape, '[', '3', '3', 'm'} - colorCyan = []byte{keyEscape, '[', '3', '6', 'm'} - keyReset = []byte{keyEscape, '[', '0', 'm'} -) - -func promptWithColor(s string, color []byte) []byte { - bb := append(color, []byte(s)...) - return append(bb, keyReset...) -} - -func getConfirm(ui *input.UI, or *influxdb.OnboardingRequest) bool { - prompt := promptWithColor("Confirm? (y/n)", colorRed) - for { - rp := "infinite" - if or.RetentionPeriod > 0 { - rp = fmt.Sprintf("%d hrs", time.Duration(or.RetentionPeriod)/time.Hour) - } - ui.Writer.Write(promptWithColor(fmt.Sprintf(` + if confirmed := internal2.GetConfirm(ui, func() string { + rp := "infinite" + if req.RetentionPeriod > 0 { + rp = fmt.Sprintf("%d hrs", time.Duration(req.RetentionPeriod)/time.Hour) + } + return fmt.Sprintf(` You have entered: Username: %s Organization: %s Bucket: %s Retention Period: %s -`, or.User, or.Org, or.Bucket, rp), colorCyan)) - result, err := ui.Ask(string(prompt), &input.Options{ - HideOrder: true, - }) - if err != nil { - return false - } - switch result { - case "y": - return true - case "n": - return false - default: - continue - } - } -} - -var errPasswordNotMatch = errors.New("passwords do not match") - -var errPasswordIsTooShort error = errors.New("password is too short") - -func getSecret(ui *input.UI) (secret string) { - var err error - query := string(promptWithColor("Please type your secret", colorCyan)) - for { - secret, err = ui.Ask(query, &input.Options{ - Required: true, - HideOrder: true, - Hide: true, - Mask: false, - }) - switch err { - case input.ErrInterrupted: - os.Exit(1) - default: - if secret = strings.TrimSpace(secret); secret == "" { - continue - } - } - break - } - return secret -} - -func getPassword(ui *input.UI, showNew bool) (password string) { - newStr := "" - if showNew { - newStr = " new" - } - var err error -enterPassword: - query := string(promptWithColor("Please type your"+newStr+" password", colorCyan)) - for { - password, err = ui.Ask(query, &input.Options{ - Required: true, - HideOrder: true, - Hide: true, - Mask: false, - ValidateFunc: func(s string) error { - if len(s) < minPasswordLen { - return errPasswordIsTooShort - } - return nil - }, - }) - switch err { - case input.ErrInterrupted: - os.Exit(1) - case errPasswordIsTooShort: - ui.Writer.Write(promptWithColor(fmt.Sprintf("Password too short - minimum length is %d characters!\n\r", minPasswordLen), colorRed)) - continue - default: - if password = strings.TrimSpace(password); password == "" { - continue - } - } - break - } - query = string(promptWithColor("Please type your"+newStr+" password again", colorCyan)) - for { - _, err = ui.Ask(query, &input.Options{ - Required: true, - HideOrder: true, - Hide: true, - ValidateFunc: func(s string) error { - if s != password { - return errPasswordNotMatch - } - return nil - }, - }) - switch err { - case input.ErrInterrupted: - os.Exit(1) - case nil: - // Nothing. - default: - ui.Writer.Write(promptWithColor("Passwords do not match!\n", colorRed)) - goto enterPassword +`, req.User, req.Org, req.Bucket, rp) + }); !confirmed { + return nil, fmt.Errorf("setup was canceled") } - break } - return password -} -func getInput(ui *input.UI, prompt, defaultValue string) string { - option := &input.Options{ - Required: true, - HideOrder: true, - } - if defaultValue != "" { - option.Default = defaultValue - option.HideDefault = true - } - prompt = string(promptWithColor(prompt, colorCyan)) - for { - line, err := ui.Ask(prompt, option) - switch err { - case input.ErrInterrupted: - os.Exit(1) - default: - if line = strings.TrimSpace(line); line == "" { - continue - } - return line - } - } + return req, nil } diff --git a/cmd/influx/template.go b/cmd/influx/template.go index 97f0d6d4135..22752a92468 100644 --- a/cmd/influx/template.go +++ b/cmd/influx/template.go @@ -20,6 +20,7 @@ import ( "github.com/fatih/color" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/cmd/influx/internal" + internal2 "github.com/influxdata/influxdb/v2/cmd/internal" ihttp "github.com/influxdata/influxdb/v2/http" ierror "github.com/influxdata/influxdb/v2/kit/errors" "github.com/influxdata/influxdb/v2/pkger" @@ -1109,7 +1110,7 @@ func (b *cmdTemplateBuilder) getInput(msg, defaultVal string) string { Writer: b.w, Reader: b.in, } - return getInput(ui, msg, defaultVal) + return internal2.GetInput(ui, msg, defaultVal) } func (b *cmdTemplateBuilder) convertURLEncoding(url string) pkger.Encoding { diff --git a/cmd/influx/user.go b/cmd/influx/user.go index 8e6d5f40b20..7ae5a79b008 100644 --- a/cmd/influx/user.go +++ b/cmd/influx/user.go @@ -6,9 +6,10 @@ import ( "fmt" "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/internal" "github.com/influxdata/influxdb/v2/http" "github.com/spf13/cobra" - input "github.com/tcnksm/go-input" + "github.com/tcnksm/go-input" ) type userSVCsFn func() (cmdUserDeps, error) @@ -365,7 +366,7 @@ func newUserSVC() (cmdUserDeps, error) { orgSvc := &http.OrganizationService{Client: httpClient} passSvc := &http.PasswordService{Client: httpClient} urmSvc := &http.UserResourceMappingService{Client: httpClient} - getPassFn := getPassword + getPassFn := internal.GetPassword return cmdUserDeps{ userSVC: userSvc, diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 8b9819382ae..d064c04fb26 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -33,7 +33,6 @@ func main() { // FIXME //generate.Command, //restore.Command, - upgrade.Command, &cobra.Command{ Use: "version", Short: "Print the influxd server version", @@ -43,6 +42,9 @@ func main() { }, ) + // upgrade binds options to env variables, so it must be added after rootCmd is initialized + rootCmd.AddCommand(upgrade.NewCommand()) + if err := rootCmd.Execute(); err != nil { os.Exit(1) } diff --git a/cmd/influxd/upgrade/config.go b/cmd/influxd/upgrade/config.go new file mode 100644 index 00000000000..73906a051e8 --- /dev/null +++ b/cmd/influxd/upgrade/config.go @@ -0,0 +1,12 @@ +package upgrade + +import ( + "errors" + + "go.uber.org/zap" +) + +// upgradeConfig backups existing config file and updates it with upgraded config. +func upgradeConfig(configFile string, targetOptions optionsV2, log *zap.Logger) (*configV1, error) { + return nil, errors.New("not implemented") +} diff --git a/cmd/influxd/upgrade/database.go b/cmd/influxd/upgrade/database.go new file mode 100644 index 00000000000..c925c1af1d5 --- /dev/null +++ b/cmd/influxd/upgrade/database.go @@ -0,0 +1,13 @@ +package upgrade + +import ( + "context" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/errors" + "go.uber.org/zap" +) + +// upgradeDatabases creates databases, buckets, retention policies and shard info according to 1.x meta and copies data +func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, orgID influxdb.ID, log *zap.Logger) (map[string][]string, error) { + return nil, errors.New("not implemented") +} diff --git a/cmd/influxd/upgrade/security.go b/cmd/influxd/upgrade/security.go new file mode 100644 index 00000000000..0aec7cc3c3d --- /dev/null +++ b/cmd/influxd/upgrade/security.go @@ -0,0 +1,11 @@ +package upgrade + +import ( + "errors" + "go.uber.org/zap" +) + +// generateSecurityScript generates security upgrade script. +func generateSecurityScript(v1 *influxDBv1, dbBuckets map[string][]string, log *zap.Logger) error { + return errors.New("not implemented") +} diff --git a/cmd/influxd/upgrade/setup.go b/cmd/influxd/upgrade/setup.go new file mode 100644 index 00000000000..1f3a54895a9 --- /dev/null +++ b/cmd/influxd/upgrade/setup.go @@ -0,0 +1,131 @@ +package upgrade + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/internal" + "github.com/tcnksm/go-input" +) + +func setupAdmin(ctx context.Context, v2 *influxDBv2, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + res, err := v2.onboardSvc.OnboardInitialUser(ctx, req) + + if err != nil { + return nil, fmt.Errorf("onboarding error: %w", err) + } + return res, nil +} + +func isInteractive() bool { + return !options.force || + options.target.userName == "" || + options.target.password == "" || + options.target.orgName == "" || + options.target.bucket == "" +} + +func onboardingRequest() (*influxdb.OnboardingRequest, error) { + if isInteractive() { + return interactive() + } + return nonInteractive() +} + +func nonInteractive() (*influxdb.OnboardingRequest, error) { + if len(options.target.password) < internal.MinPasswordLen { + return nil, internal.ErrPasswordIsTooShort + } + req := &influxdb.OnboardingRequest{ + User: options.target.userName, + Password: options.target.password, + Token: options.target.token, + Org: options.target.orgName, + Bucket: options.target.bucket, + RetentionPeriod: influxdb.InfiniteRetention, + } + + dur, err := internal.RawDurationToTimeDuration(options.target.retention) + if err != nil { + return nil, err + } + if dur > 0 { + req.RetentionPeriod = uint(dur / time.Hour) + } + return req, nil +} + +func interactive() (req *influxdb.OnboardingRequest, err error) { + ui := &input.UI{ + Writer: os.Stdout, + Reader: os.Stdin, + } + req = new(influxdb.OnboardingRequest) + fmt.Println(string(internal.PromptWithColor(`Welcome to InfluxDB 2.0 upgrade!`, internal.ColorYellow))) + if options.target.userName != "" { + req.User = options.target.userName + } else { + req.User = internal.GetInput(ui, "Please type your primary username", "") + } + if options.target.password != "" && len(options.target.password) >= internal.MinPasswordLen { + req.Password = options.target.password + } else { + req.Password = internal.GetPassword(ui, false) + } + if options.target.token != "" { + req.Token = options.target.token + // else auto-generated by service + } + if options.target.orgName != "" { + req.Org = options.target.orgName + } else { + req.Org = internal.GetInput(ui, "Please type your primary organization name", "") + } + if options.target.bucket != "" { + req.Bucket = options.target.bucket + } else { + req.Bucket = internal.GetInput(ui, "Please type your primary bucket name", "") + } + + dur, err := internal.RawDurationToTimeDuration(options.target.retention) + if err != nil { + return nil, err + } + + if dur > 0 { + req.RetentionPeriod = uint(dur / time.Hour) + } else { + for { + rpStr := internal.GetInput(ui, "Please type your retention period in hours.\r\nOr press ENTER for infinite.", strconv.Itoa(influxdb.InfiniteRetention)) + rp, err := strconv.Atoi(rpStr) + if rp >= 0 && err == nil { + req.RetentionPeriod = uint(rp) * uint(time.Hour) + break + } + } + } + + if !options.force { + if confirmed := internal.GetConfirm(ui, func() string { + rp := "infinite" + if req.RetentionPeriod > 0 { + rp = fmt.Sprintf("%d hrs", time.Duration(req.RetentionPeriod)/time.Hour) + } + return fmt.Sprintf(` +You have entered: + Username: %s + Organization: %s + Bucket: %s + Retention Period: %s +`, req.User, req.Org, req.Bucket, rp) + }); !confirmed { + return nil, fmt.Errorf("setup was canceled") + } + } + + return req, nil +} diff --git a/cmd/influxd/upgrade/upgrade.go b/cmd/influxd/upgrade/upgrade.go index 5fe318cfcd8..53edda3ae72 100644 --- a/cmd/influxd/upgrade/upgrade.go +++ b/cmd/influxd/upgrade/upgrade.go @@ -2,21 +2,24 @@ package upgrade import ( "context" + "errors" "fmt" + "io/ioutil" "os" "os/user" "path/filepath" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/authorizer" "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/dbrp" "github.com/influxdata/influxdb/v2/internal/fs" + "github.com/influxdata/influxdb/v2/kit/cli" "github.com/influxdata/influxdb/v2/kit/metric" "github.com/influxdata/influxdb/v2/kit/prom" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/kv/migration" "github.com/influxdata/influxdb/v2/kv/migration/all" + "github.com/influxdata/influxdb/v2/storage" "github.com/influxdata/influxdb/v2/tenant" "github.com/influxdata/influxdb/v2/v1/services/meta" "github.com/influxdata/influxdb/v2/v1/services/meta/filestore" @@ -24,18 +27,49 @@ import ( "go.uber.org/zap" ) -var Command = &cobra.Command{ - Use: "upgrade", - Short: "Upgrade a 1.x version of InfluxDB", - RunE: runUpgradeE, +// Simplified 1.x config. +type configV1 struct { + Meta struct { + Dir string `toml:"dir"` + } `toml:"meta"` + Data struct { + Dir string `toml:"dir"` + WALDir string `toml:"wal-dir"` + } `toml:"data"` } type optionsV1 struct { metaDir string + walDir string + dataDir string + // cmd option + dbDir string + configFile string +} + +func (o *optionsV1) checkDirs() error { + if o.metaDir == "" || o.dataDir == "" || o.walDir == "" { + if o.dbDir == "" { + return errors.New("source directory not specified") + } else { + o.metaDir = filepath.Join(o.dbDir, "meta") + o.dataDir = filepath.Join(o.dbDir, "data") + o.walDir = filepath.Join(o.dbDir, "wal") + } + } + return nil } type optionsV2 struct { - boltPath string + boltPath string + enginePath string + userName string + password string + orgName string + bucket string + token string + retention string + securityScriptPath string } var options = struct { @@ -44,10 +78,16 @@ var options = struct { // flags for target InfluxDB target optionsV2 + + // verbose output + verbose bool + + logPath string + + force bool }{} -func init() { - flags := Command.Flags() +func NewCommand() *cobra.Command { // source flags v1dir, err := influxDirV1() @@ -55,19 +95,133 @@ func init() { panic("error fetching default InfluxDB 1.x dir: " + err.Error()) } - flags.StringVar(&options.source.metaDir, "v1-meta-dir", filepath.Join(v1dir, "meta"), "Path to 1.x meta.db directory") - // target flags v2dir, err := fs.InfluxDir() if err != nil { panic("error fetching default InfluxDB 2.0 dir: " + err.Error()) } - flags.StringVar(&options.target.boltPath, "v2-bolt-path", filepath.Join(v2dir, "influxd.bolt"), "Path to 2.0 metadata") + cmd := &cobra.Command{ + Use: "upgrade", + Short: "Upgrade a 1.x version of InfluxDB", + Long: ` + Upgrades a 1.x version of InfluxDB by performing following actions: + 1. Reads 1.x config file and creates 2.x config file with matching options. Unsupported 1.x options are reported. + 2. Copies and upgrades 1.x database files. + 3. Creates a script for creating 1.x users and their permissions. This scripts needs to be revised and run manually after starting 2.x. + + If config file is not available, 1.x db folder (--v1-dir options) is taken as an input. + Target 2.x database dir is specified by the --engine-path option. If changed, bolt path should to be changed as well. +`, + RunE: runUpgradeE, + } + + opts := []cli.Opt{ + { + DestP: &options.source.dbDir, + Flag: "v1-dir", + Default: v1dir, + Desc: "path to source 1.x db directory containing meta, data and wal sub-folders", + }, + { + DestP: &options.verbose, + Flag: "verbose", + Default: true, + Desc: "verbose output", + Short: 'v', + }, + { + DestP: &options.target.boltPath, + Flag: "bolt-path", + Default: filepath.Join(v2dir, bolt.DefaultFilename), + Desc: "path for boltdb database", + Short: 'm', + }, + { + DestP: &options.target.enginePath, + Flag: "engine-path", + Default: filepath.Join(v2dir, "engine"), + Desc: "path for persistent engine files", + Short: 'e', + }, + { + DestP: &options.target.userName, + Flag: "username", + Default: "", + Desc: "primary username", + Short: 'u', + Required: true, + }, + { + DestP: &options.target.password, + Flag: "password", + Default: "", + Desc: "password for username", + Short: 'p', + Required: true, + }, + { + DestP: &options.target.orgName, + Flag: "org", + Default: "", + Desc: "primary organization name", + Short: 'o', + Required: true, + }, + { + DestP: &options.target.bucket, + Flag: "bucket", + Default: "", + Desc: "primary bucket name", + Short: 'b', + Required: true, + }, + { + DestP: &options.target.retention, + Flag: "retention", + Default: "", + Desc: "optional: duration bucket will retain data. 0 is infinite. Default is 0.", + Short: 'r', + }, + { + DestP: &options.target.token, + Flag: "token", + Default: "", + Desc: "optional: token for username, else auto-generated", + Short: 't', + }, + { + DestP: &options.source.configFile, + Flag: "config-file", + Default: influxConfigPathV1(), + Desc: "optional: Custom InfluxDB 1.x config file path, else the default config file", + }, + { + DestP: &options.target.securityScriptPath, + Flag: "security-script", + Default: filepath.Join(homeOrAnyDir(), "influxd-upgrade-security.sh"), + Desc: "optional: generated security upgrade script path", + }, + { + DestP: &options.logPath, + Flag: "log-path", + Default: filepath.Join(homeOrAnyDir(), "upgrade.log"), + Desc: "optional: custom log file path", + }, + { + DestP: &options.force, + Flag: "force", + Default: false, + Desc: "skip confirmation prompt", + Short: 'f', + }, + } + cli.BindOptions(cmd, opts) // add sub commands - Command.AddCommand(v1DumpMetaCommand) - Command.AddCommand(v2DumpMetaCommand) + cmd.AddCommand(v1DumpMetaCommand) + cmd.AddCommand(v2DumpMetaCommand) + return cmd } type influxDBv1 struct { @@ -75,41 +229,142 @@ type influxDBv1 struct { } type influxDBv2 struct { + log *zap.Logger boltClient *bolt.Client store *bolt.KVStore kvStore kv.SchemaStore tenantStore *tenant.Store ts *tenant.Service dbrpSvc influxdb.DBRPMappingServiceV2 + bucketSvc influxdb.BucketService onboardSvc influxdb.OnboardingService kvService *kv.Service meta *meta.Client } -func runUpgradeE(cmd *cobra.Command, args []string) error { +func (i *influxDBv2) close() error { + err := i.meta.Close() + if err != nil { + return err + } + err = i.boltClient.Close() + if err != nil { + return err + } + err = i.store.Close() + if err != nil { + return err + } + return nil +} + +func runUpgradeE(*cobra.Command, []string) error { ctx := context.Background() + config := zap.NewProductionConfig() + config.OutputPaths = append(config.OutputPaths, options.logPath) + config.ErrorOutputPaths = append(config.ErrorOutputPaths, options.logPath) + log, err := config.Build() + if err != nil { + return err + } + log.Info("Starting InfluxDB 1.x upgrade") + + if options.source.configFile != "" { + log.Info("Upgrading config file", zap.String("file", options.source.configFile)) + if _, err := os.Stat(options.source.configFile); err != nil { + return err + } + v1Config, err := upgradeConfig(options.source.configFile, options.target, log) + if err != nil { + return err + } + options.source.metaDir = v1Config.Meta.Dir + options.source.dataDir = v1Config.Data.Dir + options.source.walDir = v1Config.Data.WALDir + } else { + log.Info("No InfluxDB 1.x config file specified, skipping its upgrade") + } + + if err := options.source.checkDirs(); err != nil { + return err + } + + metaDBPath := filepath.Join(options.source.metaDir, "meta.db") + if _, err := os.Stat(metaDBPath); err != nil { + return fmt.Errorf("1.x metadb error: %w", err) + } + + log.Info("Upgrade source paths", zap.String("meta", options.source.metaDir), zap.String("data", options.source.dataDir)) + log.Info("Upgrade target paths", zap.String("bolt", options.target.boltPath), zap.String("engine", options.target.enginePath)) + + if fi, err := os.Stat(options.target.enginePath); err == nil { + if !fi.IsDir() { + return fmt.Errorf("engine path '%s' is not directory", options.target.enginePath) + } + entries, err := ioutil.ReadDir(options.target.enginePath) + if err != nil { + return err + } + if len(entries) > 0 { + return fmt.Errorf("target engine path '%s' must be empty", options.target.enginePath) + } + } v1, err := newInfluxDBv1(&options.source) if err != nil { return err } - _ = v1 - v2, err := newInfluxDBv2(ctx, &options.target) + v2, err := newInfluxDBv2(ctx, &options.target, log) if err != nil { return err } - _ = v2 - // 1. Onboard the initial admin user - // v2.onboardSvc.OnboardInitialUser() + defer func() { + if err := v2.close(); err != nil { + log.Error("Failed to close 2.0 services", zap.Error(err)) + } + }() - // 2. read each database / retention policy from v1.meta and create bucket db-name/rp-name - // newBucket := v2.ts.CreateBucket(ctx, Bucket{}) - // - // 3. create database in v2.meta - // v2.meta.CreateDatabase(newBucket.ID.String()) - // copy shard info from v1.meta + canOnboard, err := v2.onboardSvc.IsOnboarding(ctx) + if err != nil { + return err + } + + if !canOnboard { + return errors.New("InfluxDB has been already set up") + } + + req, err := onboardingRequest() + if err != nil { + return err + } + or, err := setupAdmin(ctx, v2, req) + if err != nil { + return err + } + + options.target.token = or.Auth.Token + + db2BucketIds, err := upgradeDatabases(ctx, v1, v2, or.Org.ID, log) + if err != nil { + //remove all files + log.Info("Database upgrade error, removing data") + if e := os.Remove(options.target.boltPath); e != nil { + log.Error("Unable to remove bolt database.", zap.Error(e)) + } + + if e := os.RemoveAll(options.target.enginePath); e != nil { + log.Error("Unable to remove time series data.", zap.Error(e)) + } + return err + } + + if err = generateSecurityScript(v1, db2BucketIds, log); err != nil { + return err + } + + log.Info("Upgrade successfully completed. Start service now") return nil } @@ -124,11 +379,11 @@ func newInfluxDBv1(opts *optionsV1) (svc *influxDBv1, err error) { return svc, nil } -func newInfluxDBv2(ctx context.Context, opts *optionsV2) (svc *influxDBv2, err error) { - log := zap.NewNop() +func newInfluxDBv2(ctx context.Context, opts *optionsV2, log *zap.Logger) (svc *influxDBv2, err error) { reg := prom.NewRegistry(log.With(zap.String("service", "prom_registry"))) svc = &influxDBv2{} + svc.log = log // ********************* // V2 specific services @@ -179,8 +434,16 @@ func newInfluxDBv2(ctx context.Context, opts *optionsV2) (svc *influxDBv2, err e } // DB/RP service - svc.dbrpSvc = dbrp.NewService(ctx, authorizer.NewBucketService(svc.ts.BucketService), svc.kvStore) + svc.dbrpSvc = dbrp.NewService(ctx, svc.ts.BucketService, svc.kvStore) + svc.bucketSvc = svc.ts.BucketService + engine := storage.NewEngine( + opts.enginePath, + storage.NewConfig(), + storage.WithMetaClient(svc.meta), + ) + + svc.ts.BucketService = storage.NewBucketService(svc.ts.BucketService, engine) // on-boarding service (influx setup) svc.onboardSvc = tenant.NewOnboardService(svc.ts, authSvc) @@ -219,3 +482,42 @@ func influxDirV1() (string, error) { return dir, nil } + +// influxConfigFileV1 returns default 1.x config file path or empty path if not found. +func influxConfigPathV1() string { + if envVar := os.Getenv("INFLUXDB_CONFIG_PATH"); envVar != "" { + return envVar + } + for _, path := range []string{ + os.ExpandEnv("${HOME}/.influxdb/influxdb.conf"), + "/etc/influxdb/influxdb.conf", + } { + if _, err := os.Stat(path); err == nil { + return path + } + } + + return "" +} + +// homeOrAnyDir retrieves user's home directory, current working one or just none. +func homeOrAnyDir() string { + var dir string + u, err := user.Current() + if err == nil { + dir = u.HomeDir + } else if home := os.Getenv("HOME"); home != "" { + dir = home + } else if home := os.Getenv("USERPROFILE"); home != "" { + dir = home + } else { + wd, err := os.Getwd() + if err != nil { + dir = "" + } else { + dir = wd + } + } + + return dir +} diff --git a/cmd/influxd/upgrade/upgrade_test.go b/cmd/influxd/upgrade/upgrade_test.go new file mode 100644 index 00000000000..c11626c8940 --- /dev/null +++ b/cmd/influxd/upgrade/upgrade_test.go @@ -0,0 +1,60 @@ +package upgrade + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/influxdata/influxdb/v2/bolt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPathValidations(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "") + require.Nil(t, err) + + defer os.RemoveAll(tmpdir) + + v1Dir := filepath.Join(tmpdir, "v1db") + v2Dir := filepath.Join(tmpdir, "v2db") + + boltPath := filepath.Join(v2Dir, bolt.DefaultFilename) + enginePath := filepath.Join(v2Dir, "engine") + + err = os.MkdirAll(filepath.Join(enginePath, "db"), 0777) + require.Nil(t, err) + + largs := make([]string, 0, 9) + largs = append(largs, "--username", "my-user") + largs = append(largs, "--password", "my-password") + largs = append(largs, "--org", "my-org") + largs = append(largs, "--bucket", "my-bucket") + largs = append(largs, "--retention", "7d") + largs = append(largs, "--token", "my-token") + largs = append(largs, "--v1-dir", v1Dir) + largs = append(largs, "--bolt-path", boltPath) + largs = append(largs, "--engine-path", enginePath) + largs = append(largs, "--config-file", "") + + cmd := NewCommand() + cmd.SetArgs(largs) + + err = cmd.Execute() + require.NotNil(t, err, "Must fail") + assert.Contains(t, err.Error(), "1.x metadb error") + + err = os.MkdirAll(filepath.Join(v1Dir, "meta"), 0777) + require.Nil(t, err) + + err = ioutil.WriteFile(filepath.Join(v1Dir, "meta", "meta.db"), []byte{1}, 0777) + require.Nil(t, err) + + cmd = NewCommand() + cmd.SetArgs(largs) + + err = cmd.Execute() + require.NotNil(t, err, "Must fail") + assert.Contains(t, err.Error(), "target engine path") +} diff --git a/cmd/influxd/upgrade/v1_dump_meta.go b/cmd/influxd/upgrade/v1_dump_meta.go index 172db1cb87d..06f5f5de896 100644 --- a/cmd/influxd/upgrade/v1_dump_meta.go +++ b/cmd/influxd/upgrade/v1_dump_meta.go @@ -30,9 +30,47 @@ var v1DumpMetaCommand = &cobra.Command{ fmt.Fprintln(os.Stdout, "Databases") fmt.Fprintln(os.Stdout, "---------") - fmt.Fprintf(tw, "%s\t%s\n", "Name", "Default RP") + fmt.Fprintf(tw, "%s\t%s\t%s\n", "Name", "Default RP", "Shards") for _, row := range meta.Databases() { - fmt.Fprintf(tw, "%s\t%s\n", row.Name, row.DefaultRetentionPolicy) + fmt.Fprintf(tw, "%s\t%s\t", row.Name, row.DefaultRetentionPolicy) + for i, si := range row.ShardInfos() { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Retention policies") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database", "Name", "Duration", "Shard Group duration") + for _, db := range meta.Databases() { + for _, rp := range db.RetentionPolicies { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", db.Name, rp.Name, rp.Duration.String(), rp.ShardGroupDuration.String()) + } + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Shard groups") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database/RP", "Start Time", "End Time", "Shards") + for _, db := range meta.Databases() { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + fmt.Fprintf(tw, "%s/%s\t%s\t%s\t", db.Name, rp.Name, sg.StartTime.String(), sg.EndTime.String()) + for i, si := range sg.Shards { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + } } _ = tw.Flush() fmt.Fprintln(os.Stdout) @@ -44,7 +82,7 @@ var v1DumpMetaCommand = &cobra.Command{ fmt.Fprintf(tw, "%s\t%s\n", row.Name, showBool(row.Admin)) } _ = tw.Flush() - + fmt.Fprintln(os.Stdout) return nil }, } diff --git a/cmd/influxd/upgrade/v2_dump_meta.go b/cmd/influxd/upgrade/v2_dump_meta.go index 1b3d9d74968..437c0b48b13 100644 --- a/cmd/influxd/upgrade/v2_dump_meta.go +++ b/cmd/influxd/upgrade/v2_dump_meta.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/internal/fs" "github.com/spf13/cobra" + "go.uber.org/zap" ) var v2DumpMetaCommand = &cobra.Command{ @@ -17,7 +18,7 @@ var v2DumpMetaCommand = &cobra.Command{ Short: "Dump InfluxDB 2.x influxd.bolt", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - svc, err := newInfluxDBv2(ctx, &v2DumpMetaOptions) + svc, err := newInfluxDBv2(ctx, &v2DumpMetaOptions, zap.NewNop()) if err != nil { return fmt.Errorf("error opening InfluxDB 2.0: %w", err) } @@ -63,6 +64,88 @@ var v2DumpMetaCommand = &cobra.Command{ _ = tw.Flush() fmt.Fprintln(os.Stdout) + fmt.Fprintln(os.Stdout, "Databases") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\n", "Name", "Default RP", "Shards") + for _, row := range svc.meta.Databases() { + fmt.Fprintf(tw, "%s\t%s\t", row.Name, row.DefaultRetentionPolicy) + for i, si := range row.ShardInfos() { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Retention policies") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database", "Name", "Duration", "Shard Group duration") + for _, db := range svc.meta.Databases() { + for _, rp := range db.RetentionPolicies { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", db.Name, rp.Name, rp.Duration.String(), rp.ShardGroupDuration.String()) + } + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Shard groups") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", "Database/RP", "Start Time", "End Time", "Shards") + for _, db := range svc.meta.Databases() { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + fmt.Fprintf(tw, "%s/%s\t%s\t%s\t", db.Name, rp.Name, sg.StartTime.String(), sg.EndTime.String()) + for i, si := range sg.Shards { + if i > 0 { + fmt.Fprint(tw, ",") + } + fmt.Fprintf(tw, "%d", si.ID) + } + fmt.Fprintln(tw) + } + } + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + fmt.Fprintln(os.Stdout, "Mappings") + fmt.Fprintln(os.Stdout, "---------") + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", "Database", "RP", "Org", "Bucket", "Default") + mappings, _, err := svc.dbrpSvc.FindMany(ctx, influxdb.DBRPMappingFilterV2{}) + if err != nil { + return err + } + showBool := func(b bool) string { + if b { + return "yes" + } + return "no" + } + for _, row := range mappings { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", row.Database, row.RetentionPolicy, row.OrganizationID.String(), row.BucketID.String(), showBool(row.Default)) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + + showCheck := func(b bool) string { + if b { + return "✓" + } + return "" + } + + fmt.Fprintln(os.Stdout, "Users") + fmt.Fprintln(os.Stdout, "-----") + fmt.Fprintf(tw, "%s\t%s\n", "Name", "Admin") + for _, row := range svc.meta.Users() { + fmt.Fprintf(tw, "%s\t%s\n", row.Name, showCheck(row.Admin)) + } + _ = tw.Flush() + fmt.Fprintln(os.Stdout) + return nil }, } diff --git a/cmd/internal/input.go b/cmd/internal/input.go new file mode 100644 index 00000000000..196f2e380d9 --- /dev/null +++ b/cmd/internal/input.go @@ -0,0 +1,209 @@ +package internal + +import ( + "errors" + "fmt" + "github.com/influxdata/influxdb/v2/task/options" + "os" + "strings" + "time" + + "github.com/tcnksm/go-input" +) + +const MinPasswordLen int = 8 + +var ( + ErrPasswordNotMatch = fmt.Errorf("passwords do not match") + ErrPasswordIsTooShort = fmt.Errorf("password is too short") +) + +// vt100EscapeCodes +var ( + KeyEscape = byte(27) + ColorRed = []byte{KeyEscape, '[', '3', '1', 'm'} + ColorYellow = []byte{KeyEscape, '[', '3', '3', 'm'} + ColorCyan = []byte{KeyEscape, '[', '3', '6', 'm'} + KeyReset = []byte{KeyEscape, '[', '0', 'm'} +) + +func PromptWithColor(s string, color []byte) []byte { + bb := append(color, []byte(s)...) + return append(bb, KeyReset...) +} + +func GetConfirm(ui *input.UI, promptFunc func() string) bool { + prompt := PromptWithColor("Confirm? (y/n)", ColorRed) + for { + ui.Writer.Write(PromptWithColor(promptFunc(), ColorCyan)) + result, err := ui.Ask(string(prompt), &input.Options{ + HideOrder: true, + }) + if err != nil { + return false + } + switch result { + case "y": + return true + case "n": + return false + default: + continue + } + } +} + +func GetSecret(ui *input.UI) (secret string) { + var err error + query := string(PromptWithColor("Please type your secret", ColorCyan)) + for { + secret, err = ui.Ask(query, &input.Options{ + Required: true, + HideOrder: true, + Hide: true, + Mask: false, + }) + switch err { + case input.ErrInterrupted: + os.Exit(1) + default: + if secret = strings.TrimSpace(secret); secret == "" { + continue + } + } + break + } + return secret +} + +func GetPassword(ui *input.UI, showNew bool) (password string) { + newStr := "" + if showNew { + newStr = " new" + } + var err error +enterPassword: + query := string(PromptWithColor("Please type your"+newStr+" password", ColorCyan)) + for { + password, err = ui.Ask(query, &input.Options{ + Required: true, + HideOrder: true, + Hide: true, + Mask: false, + ValidateFunc: func(s string) error { + if len(s) < 8 { + return ErrPasswordIsTooShort + } + return nil + }, + }) + switch err { + case input.ErrInterrupted: + os.Exit(1) + case ErrPasswordIsTooShort: + ui.Writer.Write(PromptWithColor("Password too short - minimum length is 8 characters.\n\r", ColorRed)) + continue + default: + if password = strings.TrimSpace(password); password == "" { + continue + } + } + break + } + query = string(PromptWithColor("Please type your"+newStr+" password again", ColorCyan)) + for { + _, err = ui.Ask(query, &input.Options{ + Required: true, + HideOrder: true, + Hide: true, + ValidateFunc: func(s string) error { + if s != password { + return ErrPasswordNotMatch + } + return nil + }, + }) + switch err { + case input.ErrInterrupted: + os.Exit(1) + case nil: + // Nothing. + default: + ui.Writer.Write(PromptWithColor("Passwords do not match.\n", ColorRed)) + goto enterPassword + } + break + } + return password +} + +func GetInput(ui *input.UI, prompt, defaultValue string) string { + option := &input.Options{ + Required: true, + HideOrder: true, + } + if defaultValue != "" { + option.Default = defaultValue + option.HideDefault = true + } + prompt = string(PromptWithColor(prompt, ColorCyan)) + for { + line, err := ui.Ask(prompt, option) + switch err { + case input.ErrInterrupted: + os.Exit(1) + default: + if line = strings.TrimSpace(line); line == "" { + continue + } + return line + } + } +} + +func RawDurationToTimeDuration(raw string) (time.Duration, error) { + if raw == "" { + return 0, nil + } + + if dur, err := time.ParseDuration(raw); err == nil { + return dur, nil + } + + retention, err := options.ParseSignedDuration(raw) + if err != nil { + return 0, err + } + + const ( + day = 24 * time.Hour + week = 7 * day + ) + + var dur time.Duration + for _, d := range retention.Values { + if d.Magnitude < 0 { + return 0, errors.New("must be greater than 0") + } + mag := time.Duration(d.Magnitude) + switch d.Unit { + case "w": + dur += mag * week + case "d": + dur += mag * day + case "m": + dur += mag * time.Minute + case "s": + dur += mag * time.Second + case "ms": + dur += mag * time.Minute + case "us": + dur += mag * time.Microsecond + case "ns": + dur += mag * time.Nanosecond + default: + return 0, errors.New("duration must be week(w), day(d), hour(h), min(m), sec(s), millisec(ms), microsec(us), or nanosec(ns)") + } + } + return dur, nil +}