Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bufio.Scanner: token too long #19662

Merged
merged 5 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ need to update any InfluxDB CLI config profiles with the new port number.
1. [19508](https://github.com/influxdata/influxdb/pull/19508): Add subset of InfluxQL coordinator options as flags
1. [19457](https://github.com/influxdata/influxdb/pull/19457): Add ability to export resources by name via the CLI
1. [19640](https://github.com/influxdata/influxdb/pull/19640): Turn on Community Templates
1. [19662](https://github.com/influxdata/influxdb/pull/19662): Add `max-line-length` switch to `influx write` command to address `token too long` errors for large inputs

### Bug Fixes

Expand Down
3 changes: 3 additions & 0 deletions cmd/influx/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type writeFlagsType struct {
Debug bool
SkipRowOnError bool
SkipHeader int
MaxLineLength int
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
Expand Down Expand Up @@ -84,6 +85,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().BoolVar(&writeFlags.Debug, "debug", false, "Log CSV columns to stderr before reading data rows")
cmd.PersistentFlags().BoolVar(&writeFlags.SkipRowOnError, "skipRowOnError", false, "Log CSV data errors to stderr and continue with CSV processing")
cmd.PersistentFlags().IntVar(&writeFlags.SkipHeader, "skipHeader", 0, "Skip the first <n> rows from input data")
cmd.PersistentFlags().IntVar(&writeFlags.MaxLineLength, "max-line-length", 16_000_000, "Specifies the maximum number of bytes that can be read for a single line")
cmd.Flag("skipHeader").NoOptDefVal = "1" // skipHeader flag value is optional, skip the first header when unspecified
cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name")
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
Expand Down Expand Up @@ -319,6 +321,7 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
Precision: writeFlags.Precision,
InsecureSkipVerify: flags.skipVerify,
},
MaxLineLength: writeFlags.MaxLineLength,
}
if err := s.Write(ctx, orgID, bucketID, r); err != nil && err != context.Canceled {
return fmt.Errorf("failed to write data: %v", err)
Expand Down
15 changes: 6 additions & 9 deletions pkg/csv2lp/line_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ func NewLineReaderSize(rd io.Reader, size int) *LineReader {
// Read reads data into p. It fills in data that either does
// not contain \n or ends with \n.
// It returns the number of bytes read into p.
func (lr *LineReader) Read(p []byte) (n int, err error) {
n = len(p)
// handle patologic case of reading into empty array
if n == 0 {
func (lr *LineReader) Read(p []byte) (int, error) {
// handle pathological case of reading into empty array
if len(p) == 0 {
if lr.readPos < lr.bufSize {
return 0, nil
}
Expand All @@ -71,17 +70,15 @@ func (lr *LineReader) Read(p []byte) (n int, err error) {
return 0, lr.readErr()
}
lr.readPos = 0
lr.bufSize = 0
n, lr.err = lr.rd.Read(lr.buf)
if n == 0 {
lr.bufSize, lr.err = lr.rd.Read(lr.buf)
if lr.bufSize == 0 {
return 0, lr.readErr()
}
lr.bufSize = n
}
// copy at most one line and don't overflow internal buffer or p
i := 0
lr.LastLineNumber = lr.LineNumber
for lr.readPos < lr.bufSize && i < n {
for lr.readPos < lr.bufSize && i < len(p) {
b := lr.buf[lr.readPos]
lr.readPos++
p[i] = b
Expand Down
13 changes: 13 additions & 0 deletions pkg/csv2lp/line_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing/iotest"

"github.com/influxdata/influxdb/v2/pkg/csv2lp"
"github.com/influxdata/influxdb/v2/pkg/testing/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -94,6 +95,18 @@ func TestLineReader(t *testing.T) {
}
}

// TestLineReader_Read_BufferOverflow ensures calling Read into
// a slice does not panic. Fixes https://github.com/influxdata/influxdb/issues/19586
func TestLineReader_Read_BufferOverflow(t *testing.T) {
sr := strings.NewReader("foo\nbar")
rd := csv2lp.NewLineReader(sr)
buf := make([]byte, 2)

n, err := rd.Read(buf)
assert.Equal(t, n, 2)
assert.NoError(t, err)
}

// TestLineReader_viaCsv tests correct line reporting when read through a CSV reader with various buffer sizes
// to emulate multiple required reads with a small test data set
func TestLineReader_viaCsv(t *testing.T) {
Expand Down
22 changes: 20 additions & 2 deletions write/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
Expand All @@ -18,13 +19,19 @@ const (
DefaultInterval = 10 * time.Second
)

var (
// ErrLineTooLong is the error returned when reading a line that exceeds MaxLineLength.
ErrLineTooLong = errors.New("batcher: line too long")
)

// batcher is a write service that batches for another write service.
var _ platform.WriteService = (*Batcher)(nil)

// Batcher batches line protocol for sends to output.
type Batcher struct {
MaxFlushBytes int // MaxFlushBytes is the maximum number of bytes to buffer before flushing
MaxFlushInterval time.Duration // MaxFlushInterval is the maximum amount of time to wait before flushing
MaxLineLength int // MaxLineLength specifies the maximum length of a single line
Service platform.WriteService // Service receives batches flushed from Batcher.
}

Expand All @@ -50,7 +57,7 @@ func (b *Batcher) Write(ctx context.Context, org, bucket platform.ID, r io.Reade
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
// onky if there is any error, exit immediately.
// only if there is any error, exit immediately.
if err != nil {
return err
}
Expand All @@ -66,6 +73,13 @@ func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, er
defer close(lines)
scanner := bufio.NewScanner(r)
scanner.Split(ScanLines)

maxLineLength := bufio.MaxScanTokenSize
if b.MaxLineLength > 0 {
maxLineLength = b.MaxLineLength
}
scanner.Buffer(nil, maxLineLength)

for scanner.Scan() {
// exit early if the context is done
select {
Expand All @@ -75,7 +89,11 @@ func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, er
return
}
}
errC <- scanner.Err()
err := scanner.Err()
if errors.Is(err, bufio.ErrTooLong) {
err = ErrLineTooLong
}
errC <- err
}

// finishes when the lines channel is closed or context is done.
Expand Down
84 changes: 50 additions & 34 deletions write/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/pkg/testing/assert"
)

func TestScanLines(t *testing.T) {
Expand Down Expand Up @@ -76,21 +77,18 @@ func TestBatcher_read(t *testing.T) {
type args struct {
cancel bool
r io.Reader
lines chan []byte
errC chan error
max int
}
tests := []struct {
name string
args args
want []string
wantErr bool
name string
args args
want []string
expErr error
}{
{
name: "reading two lines produces 2 lines",
args: args{
r: strings.NewReader("m1,t1=v1 f1=1\nm2,t2=v2 f2=2"),
lines: make(chan []byte),
errC: make(chan error, 1),
r: strings.NewReader("m1,t1=v1 f1=1\nm2,t2=v2 f2=2"),
},
want: []string{"m1,t1=v1 f1=1\n", "m2,t2=v2 f2=2"},
},
Expand All @@ -99,21 +97,42 @@ func TestBatcher_read(t *testing.T) {
args: args{
cancel: true,
r: strings.NewReader("m1,t1=v1 f1=1"),
lines: make(chan []byte),
errC: make(chan error, 1),
},
want: []string{},
wantErr: true,
want: nil,
expErr: context.Canceled,
},
{
name: "error from reader returns error",
args: args{
r: &errorReader{},
lines: make(chan []byte),
errC: make(chan error, 1),
r: &errorReader{},
},
want: []string{},
wantErr: true,
want: nil,
expErr: fmt.Errorf("error"),
},
{
name: "error when input exceeds max line length",
args: args{
r: strings.NewReader("m1,t1=v1 f1=1"),
max: 5,
},
want: nil,
expErr: ErrLineTooLong,
},
{
name: "lines greater than MaxScanTokenSize are allowed",
args: args{
r: strings.NewReader(strings.Repeat("a", bufio.MaxScanTokenSize+1)),
max: bufio.MaxScanTokenSize + 2,
},
want: []string{strings.Repeat("a", bufio.MaxScanTokenSize+1)},
},
{
name: "lines greater than MaxScanTokenSize by default are not allowed",
args: args{
r: strings.NewReader(strings.Repeat("a", bufio.MaxScanTokenSize+1)),
},
want: nil,
expErr: ErrLineTooLong,
},
}
for _, tt := range tests {
Expand All @@ -122,29 +141,26 @@ func TestBatcher_read(t *testing.T) {
var cancel context.CancelFunc
if tt.args.cancel {
ctx, cancel = context.WithCancel(ctx)
cancel()
}

b := &Batcher{}
got := []string{}
b := &Batcher{MaxLineLength: tt.args.max}
var got []string

go b.read(ctx, tt.args.r, tt.args.lines, tt.args.errC)
if cancel != nil {
cancel()
} else {
for line := range tt.args.lines {
lines := make(chan []byte)
errC := make(chan error, 1)

go b.read(ctx, tt.args.r, lines, errC)

if cancel == nil {
for line := range lines {
got = append(got, string(line))
}
}

err := <-tt.args.errC
if (err != nil) != tt.wantErr {
t.Errorf("ScanLines.read() error = %v, wantErr %v", err, tt.wantErr)
return
}

if !cmp.Equal(got, tt.want) {
t.Errorf("%q. Batcher.read() = -got/+want %s", tt.name, cmp.Diff(got, tt.want))
}
err := <-errC
assert.Equal(t, err, tt.expErr)
assert.Equal(t, got, tt.want)
})
}
}
Expand Down