diff --git a/.circleci/config.yml b/.circleci/config.yml index 53b69f7f25b..4159395fd5a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,5 +1,12 @@ version: 2 +workflows: + version: 2 + build: + jobs: + - build + - fluxtest + jobs: build: machine: @@ -17,3 +24,13 @@ jobs: name: Execute test command: ./test.sh $CIRCLE_NODE_INDEX no_output_timeout: 1500s + fluxtest: + machine: + enabled: true + docker_layer_caching: true + steps: + - checkout + - run: + name: Execute test + command: ./test.sh flux + no_output_timeout: 1500s diff --git a/.gitignore b/.gitignore index 592248a2e37..804a8632d39 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,10 @@ config.json # ignore generated files. cmd/influxd/version.go +# Flux test harness artifacts +fluxtest +flux.zip + # executables *.test diff --git a/CHANGELOG.md b/CHANGELOG.md index 3505410e25b..22e34b2edb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ v1.9.0 [unreleased] - [#20544](https://github.com/influxdata/influxdb/pull/20544): feat(tsi): optimize series iteration - [#17814](https://github.com/influxdata/influxdb/pull/17814): feat(prometheus): update prometheus remote protocol - [#17596](https://github.com/influxdata/influxdb/pull/17596): improvement(query): performance improvement for sorted merge iterator [Tristan Su] -- [#21015](https://github.com/influxdata/influxdb/pull/21015): chore: go 1.15.10 and flux 0.108.1 upgrade +- [#21015](https://github.com/influxdata/influxdb/pull/21015): build: upgrade to go 1.15.10 +- [#21074](https://github.com/influxdata/influxdb/pull/21074): feat: upgrade to flux 0.111.0 ### Bugfixes diff --git a/Dockerfile_build_ubuntu64 b/Dockerfile_build_ubuntu64 index c923a41674d..1e16a5092cc 100644 --- a/Dockerfile_build_ubuntu64 +++ b/Dockerfile_build_ubuntu64 @@ -1,4 +1,4 @@ -FROM ubuntu:xenial +FROM ubuntu:focal RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ asciidoc \ @@ -12,8 +12,7 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ mercurial \ pkg-config \ python \ - python-boto \ - python-software-properties \ + python3-pip \ rpm \ ruby \ ruby-dev \ @@ -22,12 +21,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ xmlto \ zip +RUN pip3 install boto RUN gem install fpm RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain stable -y # setup environment -ENV GO_VERSION 1.13.8 +ENV GO_VERSION 1.15.10 ENV GOARCH amd64 ENV GOROOT /usr/local/go ENV GOPATH /root/go @@ -42,5 +42,3 @@ RUN mkdir -p $PROJECT_DIR WORKDIR $PROJECT_DIR VOLUME $PROJECT_DIR - -ENTRYPOINT [ "/root/influxdb/build.py" ] diff --git a/Dockerfile_build_ubuntu64_git b/Dockerfile_build_ubuntu64_git deleted file mode 100644 index 59a1d2808ad..00000000000 --- a/Dockerfile_build_ubuntu64_git +++ /dev/null @@ -1,44 +0,0 @@ -FROM ubuntu:xenial - -RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ - python-software-properties \ - software-properties-common \ - wget \ - git \ - mercurial \ - make \ - ruby \ - ruby-dev \ - build-essential \ - rpm \ - zip \ - python \ - python-boto - -RUN gem install fpm - -# Setup env -ENV GOPATH /root/go -ENV PROJECT_DIR $GOPATH/src/github.com/influxdata/influxdb -ENV PATH $GOPATH/bin:$PATH -RUN mkdir -p $PROJECT_DIR - -VOLUME $PROJECT_DIR - - -# Install go -ENV GO_VERSION 1.13 -ENV GO_ARCH amd64 -RUN wget --no-verbose https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz && \ - tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz && \ - rm /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz - -# Clone Go tip for compilation -ENV GOROOT_BOOTSTRAP /usr/local/go -RUN git clone https://go.googlesource.com/go -ENV PATH /go/bin:$PATH - -# Add script for compiling go -ENV GO_CHECKOUT master -ADD ./gobuild.sh /gobuild.sh -ENTRYPOINT [ "/gobuild.sh" ] diff --git a/build.py b/build.py index 29d6c90ed76..8d2e5bcb6c1 100755 --- a/build.py +++ b/build.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2.7 -u +#!/usr/bin/env python3 import sys import os diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index d6c3dceadcf..530f1b084c2 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -15,6 +15,7 @@ import ( "strconv" "time" + fluxinit "github.com/influxdata/influxdb/flux/init" "github.com/influxdata/influxdb/logger" "go.uber.org/zap" ) @@ -101,6 +102,11 @@ func (cmd *Command) Run(args ...string) error { go func() { http.ListenAndServe("localhost:6060", nil) }() } + // Initialize the Flux built-ins if enabled. + if config.HTTPD.FluxEnabled { + fluxinit.Initialize() + } + // Print sweet InfluxDB logo. if !config.Logging.SuppressLogo && logger.IsTerminal(cmd.Stdout) { fmt.Fprint(cmd.Stdout, logo) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f65a2abf50a..a32f0da1e4c 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -70,7 +70,8 @@ type Server struct { BindAddress string Listener net.Listener - Logger *zap.Logger + Logger *zap.Logger + MuxLogger *log.Logger MetaClient *meta.Client @@ -172,7 +173,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { BindAddress: bind, - Logger: logger.New(os.Stderr), + Logger: logger.New(os.Stderr), + MuxLogger: tcp.MuxLogger(os.Stderr), MetaClient: meta.NewClient(c.Meta), @@ -263,6 +265,7 @@ func (s *Server) appendSnapshotterService() { // after the Open method has been called. func (s *Server) SetLogOutput(w io.Writer) { s.Logger = logger.New(w) + s.MuxLogger = tcp.MuxLogger(w) } func (s *Server) appendMonitorService() { @@ -391,7 +394,7 @@ func (s *Server) Open() error { s.Listener = ln // Multiplex listener. - mux := tcp.NewMux() + mux := tcp.NewMux(s.MuxLogger) go mux.Serve(ln) // Append services. diff --git a/flux/control/controller.go b/flux/control/controller.go index c52374f4564..e30bdc5efc4 100644 --- a/flux/control/controller.go +++ b/flux/control/controller.go @@ -5,11 +5,11 @@ import ( "time" "github.com/influxdata/flux" + "github.com/influxdata/flux/dependencies/testing" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/runtime" "github.com/influxdata/influxdb/coordinator" - "github.com/influxdata/influxdb/flux/builtin" "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -72,15 +72,13 @@ func (p *program) Start(ctx context.Context, allocator *memory.Allocator) (flux. } func NewController(mc MetaClient, reader influxdb.Reader, auth Authorizer, authEnabled bool, writer influxdb.PointsWriter, logger *zap.Logger) *Controller { - builtin.Initialize() - storageDeps, err := influxdb.NewDependencies(mc, reader, auth, authEnabled, writer) if err != nil { panic(err) } return &Controller{ - deps: []flux.Dependency{storageDeps}, + deps: append([]flux.Dependency{storageDeps}, testing.FrameworkConfig{}), logger: logger, } } diff --git a/flux/builtin/builtin.go b/flux/init/init.go similarity index 61% rename from flux/builtin/builtin.go rename to flux/init/init.go index 4c56914458d..d74cb25083f 100644 --- a/flux/builtin/builtin.go +++ b/flux/init/init.go @@ -1,23 +1,20 @@ -// Package builtin ensures all packages related to Flux built-ins are imported and initialized. +// Package init ensures all packages related to Flux built-ins are imported and initialized. // This should only be imported from main or test packages. // It is a mistake to import it from any other package. -package builtin +// +// NOTE: This is a superset-wrapper of Flux's built-in initialization logic. +// It also ensures V1-specific flux builtins are initialized. +package init import ( - "sync" - "github.com/influxdata/flux/runtime" _ "github.com/influxdata/flux/stdlib" _ "github.com/influxdata/influxdb/flux/stdlib" ) -var once sync.Once - // Initialize ensures all Flux builtins are configured and should be called // prior to using the Flux runtime. Initialize is safe to call concurrently // and is idempotent. func Initialize() { - once.Do(func() { - runtime.FinalizeBuiltIns() - }) + runtime.FinalizeBuiltIns() } diff --git a/flux/init/static/static.go b/flux/init/static/static.go new file mode 100644 index 00000000000..83fa169d974 --- /dev/null +++ b/flux/init/static/static.go @@ -0,0 +1,9 @@ +// The init/static package can be imported in test cases and other uses +// cases where it is okay to always initialize flux. +package static + +import fluxinit "github.com/influxdata/influxdb/flux/init" + +func init() { + fluxinit.Initialize() +} diff --git a/flux/stdlib/influxdata/influxdb/filter_test.flux b/flux/stdlib/influxdata/influxdb/filter_test.flux new file mode 100644 index 00000000000..2f39f442323 --- /dev/null +++ b/flux/stdlib/influxdata/influxdb/filter_test.flux @@ -0,0 +1,41 @@ +package influxdb_test + +import "csv" +import "testing" +import "testing/expect" + +option now = () => (2030-01-01T00:00:00Z) + +input = "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.63 +,,1,2018-05-22T19:53:26Z,system,host.local,load3,1.72 +,,2,2018-05-22T19:53:26Z,system,host.local,load4,1.77 +,,2,2018-05-22T19:53:36Z,system,host.local,load4,1.78 +,,2,2018-05-22T19:53:46Z,system,host.local,load4,1.77 +" + +testcase filter { + expect.planner(rules: [ + "influxdata/influxdb.FromStorageRule": 1, + "PushDownRangeRule": 1, + "PushDownFilterRule": 1, + ]) + + want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.63 +") + + got = testing.loadStorage(csv: input) + |> range(start: -100y) + |> filter(fn: (r) => r._measurement == "system" and r._field == "load1") + |> drop(columns: ["_start", "_stop"]) + testing.diff(want, got) +} diff --git a/flux/stdlib/influxdata/influxdb/from.go b/flux/stdlib/influxdata/influxdb/from.go index 1c51fac86ee..f674e9f185f 100644 --- a/flux/stdlib/influxdata/influxdb/from.go +++ b/flux/stdlib/influxdata/influxdb/from.go @@ -6,97 +6,32 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/codes" "github.com/influxdata/flux/plan" - "github.com/influxdata/flux/runtime" "github.com/influxdata/flux/stdlib/influxdata/influxdb" ) const FromKind = "influxDBFrom" -type FromOpSpec struct { - Bucket string `json:"bucket,omitempty"` - BucketID string `json:"bucketID,omitempty"` -} - -func init() { - fromSignature := runtime.MustLookupBuiltinType("influxdata/influxdb", "from") - runtime.ReplacePackageValue("influxdata/influxdb", influxdb.FromKind, flux.MustValue(flux.FunctionValue(FromKind, createFromOpSpec, fromSignature))) - flux.RegisterOpSpec(FromKind, newFromOp) - plan.RegisterProcedureSpec(FromKind, newFromProcedure, FromKind) -} - -func createFromOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) { - spec := new(FromOpSpec) - - if bucket, ok, err := args.GetString("bucket"); err != nil { - return nil, err - } else if ok { - spec.Bucket = bucket - } - - if bucketID, ok, err := args.GetString("bucketID"); err != nil { - return nil, err - } else if ok { - spec.BucketID = bucketID - } - - if spec.Bucket == "" && spec.BucketID == "" { - return nil, &flux.Error{ - Code: codes.Invalid, - Msg: "must specify one of bucket or bucketID", - } - } - if spec.Bucket != "" && spec.BucketID != "" { - return nil, &flux.Error{ - Code: codes.Invalid, - Msg: "must specify only one of bucket or bucketID", - } - } - return spec, nil -} - -func newFromOp() flux.OperationSpec { - return new(FromOpSpec) -} - -func (s *FromOpSpec) Kind() flux.OperationKind { - return FromKind -} - -type FromProcedureSpec struct { - Bucket string - BucketID string -} - -func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) { - spec, ok := qs.(*FromOpSpec) - if !ok { - return nil, &flux.Error{ - Code: codes.Internal, - Msg: fmt.Sprintf("invalid spec type %T", qs), - } - } +type ( + NameOrID = influxdb.NameOrID + FromOpSpec = influxdb.FromOpSpec +) - return &FromProcedureSpec{ - Bucket: spec.Bucket, - BucketID: spec.BucketID, - }, nil +type FromStorageProcedureSpec struct { + Bucket influxdb.NameOrID } -func (s *FromProcedureSpec) Kind() plan.ProcedureKind { +func (s *FromStorageProcedureSpec) Kind() plan.ProcedureKind { return FromKind } -func (s *FromProcedureSpec) Copy() plan.ProcedureSpec { - ns := new(FromProcedureSpec) - +func (s *FromStorageProcedureSpec) Copy() plan.ProcedureSpec { + ns := new(FromStorageProcedureSpec) ns.Bucket = s.Bucket - ns.BucketID = s.BucketID - return ns } -func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error { - // FromProcedureSpec is a logical operation representing any read +func (s *FromStorageProcedureSpec) PostPhysicalValidate(id plan.NodeID) error { + // FromStorageProcedureSpec is a logical operation representing any read // from storage. However as a logical operation, it doesn't specify // how data is to be read from storage. It is the query planner's // job to determine the optimal read strategy and to convert this @@ -108,10 +43,10 @@ func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error { // not support unbounded reads, and so this query must not be // validated. var bucket string - if len(s.Bucket) > 0 { - bucket = s.Bucket + if len(s.Bucket.Name) > 0 { + bucket = s.Bucket.Name } else { - bucket = s.BucketID + bucket = s.Bucket.ID } return &flux.Error{ Code: codes.Invalid, diff --git a/flux/stdlib/influxdata/influxdb/from_test.go b/flux/stdlib/influxdata/influxdb/from_test.go index daba8c93621..f7cf0e38514 100644 --- a/flux/stdlib/influxdata/influxdb/from_test.go +++ b/flux/stdlib/influxdata/influxdb/from_test.go @@ -1 +1,44 @@ package influxdb_test + +import ( + "context" + "testing" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/plan/plantest" + "github.com/influxdata/flux/stdlib/influxdata/influxdb" + "github.com/influxdata/flux/stdlib/universe" + qinfluxdb "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb" + "github.com/stretchr/testify/require" +) + +func TestFromValidation(t *testing.T) { + spec := plantest.PlanSpec{ + // from |> group (cannot query an infinite time range) + Nodes: []plan.Node{ + plan.CreateLogicalNode("from", &influxdb.FromProcedureSpec{ + Bucket: influxdb.NameOrID{Name: "my-bucket"}, + }), + plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{ + GroupMode: flux.GroupModeBy, + GroupKeys: []string{"_measurement", "_field"}, + }), + }, + Edges: [][2]int{ + {0, 1}, + }, + } + + ps := plantest.CreatePlanSpec(&spec) + pp := plan.NewPhysicalPlanner(plan.OnlyPhysicalRules( + qinfluxdb.FromStorageRule{}, + qinfluxdb.PushDownRangeRule{}, + qinfluxdb.PushDownFilterRule{}, + qinfluxdb.PushDownGroupRule{}, + )) + _, err := pp.Plan(context.Background(), ps) + require.Error(t, err, "Expected query with no call to range to fail physical planning") + want := `cannot submit unbounded read to "my-bucket"; try bounding 'from' with a call to 'range'` + require.Equal(t, want, err.Error()) +} diff --git a/flux/stdlib/influxdata/influxdb/rules.go b/flux/stdlib/influxdata/influxdb/rules.go index 1063cebd1f2..90430379c86 100644 --- a/flux/stdlib/influxdata/influxdb/rules.go +++ b/flux/stdlib/influxdata/influxdb/rules.go @@ -5,9 +5,11 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/codes" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/semantic" + "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/universe" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage/reads/datatypes" @@ -15,6 +17,7 @@ import ( func init() { plan.RegisterPhysicalRules( + FromStorageRule{}, PushDownRangeRule{}, PushDownFilterRule{}, PushDownGroupRule{}, @@ -22,6 +25,35 @@ func init() { PushDownReadTagValuesRule{}, SortedPivotRule{}, ) + plan.RegisterLogicalRules( + universe.MergeFiltersRule{}, + ) +} + +type FromStorageRule struct{} + +func (rule FromStorageRule) Name() string { + return "influxdata/influxdb.FromStorageRule" +} + +func (rule FromStorageRule) Pattern() plan.Pattern { + return plan.Pat(influxdb.FromKind) +} + +func (rule FromStorageRule) Rewrite(ctx context.Context, node plan.Node) (plan.Node, bool, error) { + fromSpec := node.ProcedureSpec().(*influxdb.FromProcedureSpec) + if fromSpec.Host != nil { + return node, false, nil + } else if fromSpec.Org != nil { + return node, false, &flux.Error{ + Code: codes.Unimplemented, + Msg: "reads from the storage engine cannot read from a separate organization; please specify a host or remove the organization", + } + } + + return plan.CreateLogicalNode("fromStorage", &FromStorageProcedureSpec{ + Bucket: fromSpec.Bucket, + }), true, nil } // PushDownGroupRule pushes down a group operation to storage @@ -77,12 +109,12 @@ func (rule PushDownRangeRule) Pattern() plan.Pattern { // Rewrite converts 'from |> range' into 'ReadRange' func (rule PushDownRangeRule) Rewrite(ctx context.Context, node plan.Node) (plan.Node, bool, error) { fromNode := node.Predecessors()[0] - fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec) + fromSpec := fromNode.ProcedureSpec().(*FromStorageProcedureSpec) rangeSpec := node.ProcedureSpec().(*universe.RangeProcedureSpec) return plan.CreatePhysicalNode("ReadRange", &ReadRangePhysSpec{ - Bucket: fromSpec.Bucket, - BucketID: fromSpec.BucketID, + Bucket: fromSpec.Bucket.Name, + BucketID: fromSpec.Bucket.ID, Bounds: rangeSpec.Bounds, }), true, nil } diff --git a/flux/stdlib/influxdata/influxdb/rules_test.go b/flux/stdlib/influxdata/influxdb/rules_test.go index f7e0f202227..b7115aa303a 100644 --- a/flux/stdlib/influxdata/influxdb/rules_test.go +++ b/flux/stdlib/influxdata/influxdb/rules_test.go @@ -1,16 +1,19 @@ package influxdb_test import ( + "context" "testing" "time" "github.com/influxdata/flux" "github.com/influxdata/flux/ast" "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/plan/plantest" "github.com/influxdata/flux/semantic" + fluxinfluxdb "github.com/influxdata/flux/stdlib/influxdata/influxdb" "github.com/influxdata/flux/stdlib/universe" "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb" "github.com/influxdata/influxdb/storage/reads/datatypes" @@ -23,8 +26,8 @@ func fluxTime(t int64) flux.Time { } func TestPushDownRangeRule(t *testing.T) { - fromSpec := influxdb.FromProcedureSpec{ - Bucket: "my-bucket", + fromSpec := influxdb.FromStorageProcedureSpec{ + Bucket: influxdb.NameOrID{Name: "my-bucket"}, } rangeSpec := universe.RangeProcedureSpec{ Bounds: flux.Bounds{ @@ -314,7 +317,7 @@ func TestPushDownFilterRule(t *testing.T) { }, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ - plan.CreateLogicalNode("from", &influxdb.FromProcedureSpec{}), + plan.CreateLogicalNode("from", &influxdb.FromStorageProcedureSpec{}), plan.CreatePhysicalNode("range", &universe.RangeProcedureSpec{ Bounds: bounds, }), @@ -863,8 +866,8 @@ func TestPushDownGroupRule(t *testing.T) { } func TestReadTagKeysRule(t *testing.T) { - fromSpec := influxdb.FromProcedureSpec{ - Bucket: "my-bucket", + fromSpec := influxdb.FromStorageProcedureSpec{ + Bucket: influxdb.NameOrID{Name: "my-bucket"}, } rangeSpec := universe.RangeProcedureSpec{ Bounds: flux.Bounds{ @@ -1089,8 +1092,8 @@ func TestReadTagKeysRule(t *testing.T) { } func TestReadTagValuesRule(t *testing.T) { - fromSpec := influxdb.FromProcedureSpec{ - Bucket: "my-bucket", + fromSpec := influxdb.FromStorageProcedureSpec{ + Bucket: influxdb.NameOrID{Name: "my-bucket"}, } rangeSpec := universe.RangeProcedureSpec{ Bounds: flux.Bounds{ @@ -1315,3 +1318,77 @@ func TestReadTagValuesRule(t *testing.T) { }) } } + +func TestMergeFilterRule(t *testing.T) { + from := &fluxinfluxdb.FromProcedureSpec{} + filter0 := func() *universe.FilterProcedureSpec { + return &universe.FilterProcedureSpec{ + Fn: interpreter.ResolvedFunction{ + Fn: executetest.FunctionExpression(t, `(r) => r._field == "usage_idle"`), + }, + } + } + filter1 := func() *universe.FilterProcedureSpec { + return &universe.FilterProcedureSpec{ + Fn: interpreter.ResolvedFunction{ + Fn: executetest.FunctionExpression(t, `(r) => r._measurement == "cpu"`), + }, + } + } + filterMerge := func() *universe.FilterProcedureSpec { + return &universe.FilterProcedureSpec{ + Fn: interpreter.ResolvedFunction{ + Fn: executetest.FunctionExpression(t, `(r) => r._measurement == "cpu" and r._field == "usage_idle"`), + }, + } + } + + testcases := []plantest.RuleTestCase{ + { + Context: context.Background(), + Name: "merge filter on", + Rules: []plan.Rule{universe.MergeFiltersRule{}}, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("from", from), + plan.CreatePhysicalNode("filter0", filter0()), + plan.CreatePhysicalNode("filter1", filter1()), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + }, + }, + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("from", from), + plan.CreatePhysicalNode("filter0", filterMerge()), + }, + Edges: [][2]int{{0, 1}}, + }, + }, + { + Context: context.Background(), + Name: "merge filter off", + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("from", from), + plan.CreatePhysicalNode("filter0", filter0()), + plan.CreatePhysicalNode("filter1", filter1()), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + }, + }, + NoChange: true, + }, + } + for _, tc := range testcases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + plantest.LogicalRuleTestHelper(t, &tc) + }) + } +} diff --git a/flux/stdlib/influxdata/influxdb/to_test.go b/flux/stdlib/influxdata/influxdb/to_test.go index 288ab0ee42b..4d9207e5530 100644 --- a/flux/stdlib/influxdata/influxdb/to_test.go +++ b/flux/stdlib/influxdata/influxdb/to_test.go @@ -13,17 +13,13 @@ import ( "github.com/influxdata/flux/querytest" "github.com/influxdata/flux/values/valuestest" "github.com/influxdata/influxdb/coordinator" - "github.com/influxdata/influxdb/flux/builtin" + _ "github.com/influxdata/influxdb/flux/init/static" "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/stretchr/testify/assert" ) -func init() { - builtin.Initialize() -} - func TestTo_Query(t *testing.T) { tests := []querytest.NewQueryTestCase{ { @@ -32,9 +28,9 @@ func TestTo_Query(t *testing.T) { Want: &flux.Spec{ Operations: []*flux.Operation{ { - ID: "influxDBFrom0", + ID: "from0", Spec: &influxdb.FromOpSpec{ - Bucket: "mydb", + Bucket: influxdb.NameOrID{Name: "mydb"}, }, }, { @@ -47,7 +43,7 @@ func TestTo_Query(t *testing.T) { }, }, Edges: []flux.Edge{ - {Parent: "influxDBFrom0", Child: "influx1x/toKind1"}, + {Parent: "from0", Child: "influx1x/toKind1"}, }, }, }, diff --git a/flux/stdlib/universe/merge_filter_test.flux b/flux/stdlib/universe/merge_filter_test.flux new file mode 100644 index 00000000000..7745fec968e --- /dev/null +++ b/flux/stdlib/universe/merge_filter_test.flux @@ -0,0 +1,42 @@ +package universe_test + +import "testing" +import "testing/expect" +import "planner" + +option now = () => (2030-01-01T00:00:00Z) + +input = " +#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83 +,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.63 +,,1,2018-05-22T19:53:26Z,system,host.local,load3,1.72 +,,2,2018-05-22T19:53:26Z,system,host.local,load4,1.77 +,,2,2018-05-22T19:53:36Z,system,host.local,load4,1.78 +,,2,2018-05-22T19:53:46Z,system,host.local,load4,1.77 +" + +output = " +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,string,double +#group,false,false,true,true,false,true,true,true,false +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_measurement,host,_field,_value +,,0,2018-05-22T19:53:26Z,2030-01-01T00:00:00Z,2018-05-22T19:53:26Z,system,host.local,load4,1.77 +,,0,2018-05-22T19:53:26Z,2030-01-01T00:00:00Z,2018-05-22T19:53:46Z,system,host.local,load4,1.77 +" + +merge_filter_fn = () => + testing.loadStorage(csv: input) + |> range(start: 2018-05-22T19:53:26Z) + |> filter(fn: (r) => r["_value"] == 1.77) + |> filter(fn: (r) => r["_field"] == "load4") + +testcase merge_filter { + expect.planner(rules: ["MergeFiltersRule": 1]) + + result = merge_filter_fn() + testing.diff(got: result, want: testing.loadMem(csv: output)) +} diff --git a/go.mod b/go.mod index 2f0321e6227..360b96f304c 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/snappy v0.0.1 github.com/google/go-cmp v0.5.0 - github.com/influxdata/flux v0.108.1 + github.com/influxdata/flux v0.111.0 github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93 github.com/influxdata/pkg-config v0.2.7 github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6 @@ -31,6 +31,7 @@ require ( github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52 github.com/spf13/cast v1.3.0 + github.com/spf13/cobra v0.0.3 github.com/stretchr/testify v1.5.1 github.com/tinylib/msgp v1.1.0 github.com/willf/bitset v1.1.9 // indirect diff --git a/go.sum b/go.sum index 24f47c6c773..00840eebc2c 100644 --- a/go.sum +++ b/go.sum @@ -428,13 +428,12 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/flux v0.65.0 h1:57tk1Oo4gpGIDbV12vUAPCMtLtThhaXzub1XRIuqv6A= github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY= -github.com/influxdata/flux v0.105.1 h1:OnSjI/KZ80+rU0tlFmpm8eTfxzdx1Dt/+HAdh0/7Lxo= -github.com/influxdata/flux v0.105.1/go.mod h1:QEVEEaLEVtLXJ9YQzvVLVMoCmElUyqkFYSjfy1BEKiE= -github.com/influxdata/flux v0.108.1 h1:0rmK9ri/VJuNRyco/OtwPf+WCzrNrfuH9rSyibbe+D4= -github.com/influxdata/flux v0.108.1/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU= +github.com/influxdata/flux v0.111.0 h1:27CNz0SbEofD9NzdwcdxRwGmuVSDSisVq4dOceB/KF0= +github.com/influxdata/flux v0.111.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU= github.com/influxdata/influxdb v1.8.0/go.mod h1:SIzcnsjaHRFpmlxpJ4S3NT64qtEKYweNTUMb/vh0OMQ= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= @@ -442,7 +441,7 @@ github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93 h1:4t/8PcmLn github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= -github.com/influxdata/pkg-config v0.2.5/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= +github.com/influxdata/pkg-config v0.2.6/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/pkg-config v0.2.7 h1:LPTCWmcPkyMryHHnf+STK/zVUjQ6OCvvOukSDlJLY9I= github.com/influxdata/pkg-config v0.2.7/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= @@ -450,6 +449,7 @@ github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6 h1:UzJnB7VRL github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9 h1:MHTrDWmQpHq/hkq+7cw9oYAt2PqUw52TZazRA0N7PGE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= +github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b h1:i44CesU68ZBRvtCjBi3QSosCIKrjmMbYlQMFAwVLds4= github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 h1:+TUUmaFa4YD1Q+7bH9o5NCHQGPMqZCYJiNW6lIIS9z4= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= @@ -693,6 +693,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= diff --git a/gobuild.sh b/gobuild.sh deleted file mode 100755 index 9a96e7e9b79..00000000000 --- a/gobuild.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash -# This script run inside the Dockerfile_build_ubuntu64_git container and -# gets the latests Go source code and compiles it. -# Then passes control over to the normal build.py script - -set -e - -cd /go/src -git fetch --all -git checkout $GO_CHECKOUT -# Merge in recent changes if we are on a branch -# if we checked out a tag just ignore the error -git pull || true -./make.bash - -# Run normal build.py -cd "$PROJECT_DIR" -exec ./build.py "$@" diff --git a/internal/cmd/fluxtest-harness-influxdb/test.go b/internal/cmd/fluxtest-harness-influxdb/test.go new file mode 100644 index 00000000000..8bd20fd2fd4 --- /dev/null +++ b/internal/cmd/fluxtest-harness-influxdb/test.go @@ -0,0 +1,231 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/cmd/flux/cmd" + "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/execute/table" + "github.com/influxdata/flux/parser" + fluxClient "github.com/influxdata/influxdb/flux/client" + "github.com/influxdata/influxdb/tests" + "github.com/spf13/cobra" +) + +type testExecutor struct { + ctx context.Context + writeOptAST *ast.File + readOptAST *ast.File + i int +} + +func NewTestExecutor(ctx context.Context) (cmd.TestExecutor, error) { + e := &testExecutor{ctx: ctx} + e.init() + return e, nil +} + +func (t *testExecutor) init() { + t.writeOptAST = prepareOptions(writeOptSource) + t.readOptAST = prepareOptions(readOptSource) +} + +func (t *testExecutor) Close() error { + // Servers are closed as part of Run + return nil +} + +// Run executes an e2e test case for every supported index type. +// On failure, logs collected from the server will be printed to stderr. +func (t *testExecutor) Run(pkg *ast.Package) error { + var failed bool + for _, idx := range []string{"inmem", "tsi1"} { + logOut := &bytes.Buffer{} + if err := t.run(pkg, idx, logOut); err != nil { + failed = true + _, _ = fmt.Fprintf(os.Stderr, "Failed for index %s:\n%v\n", idx, err) + _, _ = io.Copy(os.Stderr, logOut) + } + } + + if failed { + return errors.New("test failed for some index, see logs for details") + } + return nil +} + +// run executes an e2e test case against a specific index type. +// Server logs will be written to the specified logOut writer, for reporting. +func (t *testExecutor) run(pkg *ast.Package, index string, logOut io.Writer) error { + _, _ = fmt.Fprintf(os.Stderr, "Testing %s...\n", index) + + config := tests.NewConfig() + config.HTTPD.FluxEnabled = true + config.HTTPD.FluxLogEnabled = true + config.Data.Index = index + + s := tests.NewServer(config) + s.SetLogOutput(logOut) + if err := s.Open(); err != nil { + return err + } + defer s.Close() + + dbName := fmt.Sprintf("%04d", t.i) + t.i++ + + if _, err := s.CreateDatabase(dbName); err != nil { + return err + } + defer func() { _ = s.DropDatabase(dbName) }() + + // Define bucket and org options + bucketOpt := &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: "bucket"}, + Init: &ast.StringLiteral{Value: dbName + "/autogen"}, + }, + } + + // During the first execution, we are performing the writes + // that are in the testcase. We do not care about errors. + _ = t.executeWithOptions(bucketOpt, t.writeOptAST, pkg, s.URL()) + + // Execute the read pass. + return t.executeWithOptions(bucketOpt, t.readOptAST, pkg, s.URL()) +} + +// executeWithOptions runs a Flux query against a running server via the HTTP API. +// Flux queries executed by this method are expected to return no output on success. If the API call returns any data, +// it is formatted as a table and returned wrapped in an error. +func (t *testExecutor) executeWithOptions(bucketOpt *ast.OptionStatement, optionsAST *ast.File, pkg *ast.Package, serverUrl string) error { + options := optionsAST.Copy().(*ast.File) + options.Body = append([]ast.Statement{bucketOpt}, options.Body...) + + // Add options to pkg + pkg = pkg.Copy().(*ast.Package) + pkg.Files = append([]*ast.File{options}, pkg.Files...) + + bs, err := json.Marshal(pkg) + if err != nil { + return err + } + + query := fluxClient.QueryRequest{}.WithDefaults() + query.AST = bs + query.Dialect.Annotations = csv.DefaultDialect().Annotations + j, err := json.Marshal(query) + if err != nil { + return err + } + + u, err := url.Parse(serverUrl) + if err != nil { + return err + } + u.Path = "/api/v2/query" + req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(j)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode/100 != 2 { + b, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("error response from flux query: %s", string(b)) + } + + decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{}) + r, err := decoder.Decode(resp.Body) + if err != nil { + return err + } + defer r.Release() + + for r.More() { + v := r.Next() + + if err := v.Tables().Do(func(tbl flux.Table) error { + // The data returned here is the result of `testing.diff`, so any result means that + // a comparison of two tables showed inequality. Capture that inequality as part of the error. + // XXX: rockstar (08 Dec 2020) - This could use some ergonomic work, as the diff testOutput + // is not exactly "human readable." + return fmt.Errorf("%s", table.Stringify(tbl)) + }); err != nil { + return err + } + } + r.Release() + return r.Err() +} + +// This options definition puts to() in the path of the CSV input. The tests +// get run in this case and they would normally pass, if we checked the +// results, but don't look at them. +const writeOptSource = ` +import "testing" +import c "csv" + +option testing.loadStorage = (csv) => { + return c.from(csv: csv) |> to(bucket: bucket) +} +` + +// This options definition is for the second run, the test run. It loads the +// data from previously written bucket. We check the results after running this +// second pass and report on them. +const readOptSource = ` +import "testing" +import c "csv" + +option testing.loadStorage = (csv) => { + return from(bucket: bucket) +} +` + +func prepareOptions(optionsSource string) *ast.File { + pkg := parser.ParseSource(optionsSource) + if ast.Check(pkg) > 0 { + panic(ast.GetError(pkg)) + } + return pkg.Files[0] +} + +func tryExec(cmd *cobra.Command) (err error) { + defer func() { + if e := recover(); e != nil { + var ok bool + err, ok = e.(error) + if !ok { + err = errors.New(fmt.Sprint(e)) + } + } + }() + err = cmd.Execute() + return +} + +func main() { + c := cmd.TestCommand(NewTestExecutor) + c.Use = "fluxtest-harness-influxdb" + if err := tryExec(c); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Tests failed: %v\n", err) + os.Exit(1) + } +} diff --git a/services/snapshotter/service_test.go b/services/snapshotter/service_test.go index dfc48a72b5b..a6f63f5c723 100644 --- a/services/snapshotter/service_test.go +++ b/services/snapshotter/service_test.go @@ -424,7 +424,7 @@ func NewTestService() (*snapshotter.Service, net.Listener, error) { } // The snapshotter needs to be used with a tcp.Mux listener. - mux := tcp.NewMux() + mux := tcp.NewMux(tcp.MuxLogger(os.Stderr)) go mux.Serve(l) s.Listener = mux.Listen(snapshotter.MuxHeader) diff --git a/tcp/mux.go b/tcp/mux.go index 38f2d9dc0c7..ad1e67198e3 100644 --- a/tcp/mux.go +++ b/tcp/mux.go @@ -7,7 +7,6 @@ import ( "io" "log" "net" - "os" "sync" "time" ) @@ -54,12 +53,16 @@ func (rc *replayConn) Read(b []byte) (int, error) { return 1, nil } +func MuxLogger(w io.Writer) *log.Logger { + return log.New(w, "[tcp] ", log.LstdFlags) +} + // NewMux returns a new instance of Mux. -func NewMux() *Mux { +func NewMux(log *log.Logger) *Mux { return &Mux{ m: make(map[byte]*listener), Timeout: DefaultTimeout, - Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags), + Logger: log, } } diff --git a/tcp/mux_test.go b/tcp/mux_test.go index 0b2930b69af..946afca89a9 100644 --- a/tcp/mux_test.go +++ b/tcp/mux_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "log" "net" + "os" "strings" "sync" "testing" @@ -37,11 +38,12 @@ func TestMux(t *testing.T) { defer tcpListener.Close() // Setup muxer & listeners. - mux := tcp.NewMux() - mux.Timeout = 200 * time.Millisecond - if !testing.Verbose() { - mux.Logger = log.New(ioutil.Discard, "", 0) + logger := log.New(ioutil.Discard, "", 0) + if testing.Verbose() { + logger = tcp.MuxLogger(os.Stderr) } + mux := tcp.NewMux(logger) + mux.Timeout = 200 * time.Millisecond errC := make(chan error) for i := uint8(0); i < n; i++ { @@ -150,7 +152,7 @@ func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) { }() // Register two listeners with the same header byte. - mux := tcp.NewMux() + mux := tcp.NewMux(tcp.MuxLogger(os.Stderr)) mux.Listen(5) mux.Listen(5) } @@ -164,7 +166,7 @@ func TestMux_Close(t *testing.T) { } done := make(chan struct{}) - mux := tcp.NewMux() + mux := tcp.NewMux(tcp.MuxLogger(os.Stderr)) go func() { mux.Serve(listener) close(done) diff --git a/test-flux.sh b/test-flux.sh new file mode 100755 index 00000000000..e20e6831e8e --- /dev/null +++ b/test-flux.sh @@ -0,0 +1,51 @@ +#!/bin/bash +set -eu -o pipefail +readonly GO=${GO:-go} + +log() { + local now + now=$(date '+%Y/%m/%d %H:%M:%S') + echo "[${now}]" "$@" +} + +determine_flux_revision() { + local version revision + version=$("$GO" list -m -f '{{.Version}}' github.com/influxdata/flux) + revision=$(printf "%s" "${version}" | cut -d- -f 3) + if [[ ${revision} != "" ]]; then + printf "%s\n" "${revision}" + else + printf "%s\n" "${version}" + fi +} + +download_flux_archive() { + local revision + revision=$(determine_flux_revision) + log "Downloading flux archive (${revision})..." + curl -sLo flux.zip "https://github.com/influxdata/flux/archive/${revision}.zip" +} + +build_test_harness() { + log "Building test harness..." + "$GO" build -o fluxtest ./internal/cmd/fluxtest-harness-influxdb +} + +run_integration_tests() { + log "Running flux integration tests..." + ./fluxtest -v -p flux.zip + log "Running influxdb integration tests..." + ./fluxtest -v -p flux/stdlib +} + +cleanup() { + rm -f flux.zip fluxtest +} + +main() { + build_test_harness + download_flux_archive + run_integration_tests + cleanup +} +main diff --git a/test.sh b/test.sh index e26a782fdb3..0b8f9231abb 100755 --- a/test.sh +++ b/test.sh @@ -10,6 +10,7 @@ # 2: normal 32bit tests # 3: tsi build # count: print the number of test environments +# flux: run Flux e2e tests via the external test harness # *: to run all tests in parallel containers # # Logs from the test runs will be saved in OUTPUT_DIR, which defaults to ./test-logs @@ -54,12 +55,20 @@ function filename2imagename { echo ${1/Dockerfile/influxdb} } -# Run a test in a docker container -# Usage: run_test_docker +# Run go tests in a docker container +# Usage: run_test_docker ... function run_test_docker { - local dockerfile=$1 + local name=$1 + shift + run_docker "$name" /root/influxdb/build.py "${@}" --test --junit-report "--parallel=$PARALLELISM" "--timeout=$TIMEOUT" +} + +# Run a script in a docker container +# Usage: run_docker ... +function run_docker { + local dockerfile=Dockerfile_build_ubuntu64 local imagename=$(filename2imagename "$dockerfile") - shift + local name=$1 shift local logfile="$OUTPUT_DIR/${name}.log" @@ -76,8 +85,6 @@ function run_test_docker { -e "AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID" \ -e "AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \ "$imagename" \ - "--parallel=$PARALLELISM" \ - "--timeout=$TIMEOUT" \ "$@" \ 2>&1 | tee "$logfile" return "${PIPESTATUS[0]}" @@ -103,29 +110,33 @@ fi case $ENVIRONMENT_INDEX in 0) >&2 echo '64 bit tests' - run_test_docker Dockerfile_build_ubuntu64 test_64bit --test --junit-report + run_test_docker test_64bit rc=$? ;; 1) >&2 echo '64 bit race tests' GORACE="halt_on_error=1" - run_test_docker Dockerfile_build_ubuntu64 test_64bit_race --test --junit-report --race + run_test_docker test_64bit_race --race rc=$? ;; 2) >&2 echo 'tsi tests' INFLUXDB_DATA_INDEX_VERSION="tsi1" - run_test_docker Dockerfile_build_ubuntu64 test_64bit --test --junit-report + run_test_docker test_64bit_tsi rc=$? ;; "count") echo $ENV_COUNT ;; + "flux") + >&2 echo 'flux tests' + run_docker test_flux /root/influxdb/test-flux.sh + ;; *) echo "No individual test environment specified running tests for all $ENV_COUNT environments." # Run all test environments pids=() - for t in $(seq 0 "$(($ENV_COUNT - 1))") + for t in $(seq 0 "$(($ENV_COUNT - 1))") flux do $0 $t 2>&1 > /dev/null & # add PID to list diff --git a/tests/server_test.go b/tests/server_test.go index b51721bd773..7e2e1de17cb 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -30,6 +30,7 @@ import ( "github.com/influxdata/influxdb/cmd/influx/cli" "github.com/influxdata/influxdb/coordinator" fluxClient "github.com/influxdata/influxdb/flux/client" + _ "github.com/influxdata/influxdb/flux/init/static" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" "github.com/prometheus/prometheus/prompb" @@ -10368,7 +10369,7 @@ func runFluxBuiltinTest(t *testing.T, file *ast.File, u *url.URL, bucket string, inspectCalls := stdlib.TestingInspectCalls(pkg) if len(inspectCalls.Body) == 0 { - t.Skip("No tests in builtin test package") + t.Skip("No tests in init test package") } pkg.Files = append(pkg.Files, inspectCalls)