Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cmd/influx): support reading GZIP data in influx write #20763

Merged
merged 15 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
1. [20621](https://github.com/influxdata/influxdb/pull/20621): Add Swift client library to the data loading section of the UI.
1. [20307](https://github.com/influxdata/influxdb/pull/20307): Add `influx task retry-failed` command to rerun failed runs.
1. [20759](https://github.com/influxdata/influxdb/pull/20759): Add additional properties for Mosaic Graph.
1. [20763](https://github.com/influxdata/influxdb/pull/20763): Add `--compression` option to `influx write` to support GZIP inputs.

### Bug Fixes

Expand Down
62 changes: 56 additions & 6 deletions cmd/influx/write.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"compress/gzip"
"context"
"encoding/csv"
"fmt"
Expand All @@ -26,6 +27,8 @@ import (
const (
inputFormatCsv = "csv"
inputFormatLineProtocol = "lp"
inputCompressionNone = "none"
inputCompressionGzip = "gzip"
)

type buildWriteSvcFn func(builder *writeFlagsBuilder) platform.WriteService
Expand All @@ -52,6 +55,7 @@ type writeFlagsBuilder struct {
Encoding string
ErrorsFile string
RateLimit string
Compression string
}

func newWriteFlagsBuilder(svcFn buildWriteSvcFn, f *globalFlags, opt genericCLIOpts) *writeFlagsBuilder {
Expand Down Expand Up @@ -127,6 +131,7 @@ func (b *writeFlagsBuilder) cmd() *cobra.Command {
cmd.PersistentFlags().StringVar(&b.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&b.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")
cmd.PersistentFlags().StringVar(&b.RateLimit, "rate-limit", "", "Throttles write, examples: \"5 MB / 5 min\" , \"17kBs\". \"\" (default) disables throttling.")
cmd.PersistentFlags().StringVar(&b.Compression, "compression", "", "Input compression, either 'none' or 'gzip'. Defaults to 'none' unless an input has a '.gz' extension")

cmdDryRun := b.newCmd("dryrun", b.writeDryrunE, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
Expand Down Expand Up @@ -159,13 +164,32 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
if len(b.Format) > 0 && b.Format != inputFormatLineProtocol && b.Format != inputFormatCsv {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("unsupported input format: %s", b.Format)
}
// validate input compression
if len(b.Compression) > 0 && b.Compression != inputCompressionNone && b.Compression != inputCompressionGzip {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("unsupported input compression: %s", b.Compression)
}

// validate and setup decoding of files/stdin if encoding is supplied
decode, err := csv2lp.CreateDecoder(b.Encoding)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}

// utility to manage common steps used to decode / decompress input sources,
// while tracking resources that must be cleaned-up after reading.
addReader := func(r io.Reader, name string, compressed bool) error {
if compressed {
rcz, err := gzip.NewReader(r)
if err != nil {
return fmt.Errorf("failed to decompress %s: %w", name, err)
}
closers = append(closers, rcz)
r = rcz
}
readers = append(readers, decode(r), strings.NewReader("\n"))
return nil
}

// prepend header lines
if len(b.Headers) > 0 {
for _, header := range b.Headers {
Expand All @@ -184,10 +208,19 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", file, err)
}
closers = append(closers, f)
readers = append(readers, decode(f), strings.NewReader("\n"))
if len(b.Format) == 0 && strings.HasSuffix(file, ".csv") {

fname := file
compressed := b.Compression == "gzip" || (len(b.Compression) == 0 && strings.HasSuffix(fname, ".gz"))
if compressed {
fname = strings.TrimSuffix(fname, ".gz")
}
if len(b.Format) == 0 && strings.HasSuffix(fname, ".csv") {
b.Format = inputFormatCsv
}

if err = addReader(f, file, compressed); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
}

Expand All @@ -203,6 +236,7 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
}
req.Header.Set("Accept-Encoding", "gzip")
resp, err := client.Do(req)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
Expand All @@ -211,11 +245,21 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
if resp.StatusCode/100 != 2 {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: response status_code=%d", addr, resp.StatusCode)
}
readers = append(readers, decode(resp.Body), strings.NewReader("\n"))

compressed := b.Compression == "gzip" ||
resp.Header.Get("Content-Encoding") == "gzip" ||
(len(b.Compression) == 0 && strings.HasSuffix(u.Path, ".gz"))
if compressed {
u.Path = strings.TrimSuffix(u.Path, ".gz")
}
if len(b.Format) == 0 &&
(strings.HasSuffix(u.Path, ".csv") || strings.HasPrefix(resp.Header.Get("Content-Type"), "text/csv")) {
b.Format = inputFormatCsv
}

if err = addReader(resp.Body, addr, compressed); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
}

Expand All @@ -224,13 +268,19 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
case len(args) == 0:
// use also stdIn if it is a terminal
if !isCharacterDevice(cmd.InOrStdin()) {
readers = append(readers, decode(cmd.InOrStdin()))
if err = addReader(cmd.InOrStdin(), "stdin", b.Compression == "gzip"); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
case args[0] == "-":
// "-" also means stdin
readers = append(readers, decode(cmd.InOrStdin()))
if err = addReader(cmd.InOrStdin(), "stdin", b.Compression == "gzip"); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
default:
readers = append(readers, strings.NewReader(args[0]))
if err = addReader(strings.NewReader(args[0]), "arg 0", b.Compression == "gzip"); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}

// skipHeader lines when set
Expand Down
Loading