Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add fluxtest harness #21074

Merged
merged 23 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ab03eca
test: first pass at backporting flux-test harness
danxmoran Mar 25, 2021
adc3d63
test: add script to run flux tests
danxmoran Mar 25, 2021
d26687e
test: add config toggle to enable test capabilities in Flux controller
danxmoran Mar 25, 2021
0ea1767
test: explicitly set index type to fix many tests
danxmoran Mar 25, 2021
6472f2b
refactor: remove config for enabling flux testing options
danxmoran Mar 25, 2021
f9ea07d
fix: remove old parameter from test
danxmoran Mar 25, 2021
259bf49
refactor: get all tests but 1 working, remove init hack
danxmoran Mar 25, 2021
fd736d5
feat(flux): add MergeFiltersRule to pass fluxtest
danxmoran Mar 25, 2021
1ce05fe
refactor: replace existing 'from' rewrite logic with a planner rule
danxmoran Mar 26, 2021
92ac2bb
build: bump existing Dockerfiles to go 1.15
danxmoran Mar 26, 2021
5634574
build: add flux tests to CI
danxmoran Mar 26, 2021
75dfeb5
refactor: allow for overriding tcp.Mux logger
danxmoran Mar 26, 2021
474e7aa
test: run flux e2e tests against both inmem and tsi1 indexes
danxmoran Mar 26, 2021
b2522b7
chore: fix formatting
danxmoran Mar 26, 2021
c2bed12
build: upgrade to Flux v0.110.0
danxmoran Mar 29, 2021
72583b5
build: consolidate Dockerfiles, upgrade ubuntu and python
danxmoran Mar 29, 2021
bbd4c88
refactor: clean up redundant code
danxmoran Mar 29, 2021
25b81bc
chore: add some comments on new methods
danxmoran Mar 29, 2021
7c9e948
test: fix test case after deleting struct
danxmoran Mar 29, 2021
3f31972
build: delete unused Dockerfile
danxmoran Mar 29, 2021
0882048
build: clean up functions in test script
danxmoran Mar 29, 2021
4f4c355
build: upgrade to latest Flux
danxmoran Mar 30, 2021
f9416ff
chore: update CHANGELOG
danxmoran Mar 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ config.json
# ignore generated files.
cmd/influxd/version.go

# Flux test harness artifacts
fluxtest
flux.zip

# executables

*.test
Expand Down
6 changes: 6 additions & 0 deletions cmd/influxd/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"time"

fluxinit "github.com/influxdata/influxdb/flux/init"
"github.com/influxdata/influxdb/logger"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -101,6 +102,11 @@ func (cmd *Command) Run(args ...string) error {
go func() { http.ListenAndServe("localhost:6060", nil) }()
}

// Initialize the Flux built-ins if enabled.
if config.HTTPD.FluxEnabled {
fluxinit.Initialize()
}

// Print sweet InfluxDB logo.
if !config.Logging.SuppressLogo && logger.IsTerminal(cmd.Stdout) {
fmt.Fprint(cmd.Stdout, logo)
Expand Down
51 changes: 51 additions & 0 deletions etc/test-flux.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
set -eu -o pipefail
readonly GO=${GO:-go}

log() {
local now
now=$(date '+%Y/%m/%d %H:%M:%S')
echo "[${now}]" "$@"
}

determine_flux_revision() {
local version revision
version=$("$GO" list -m -f '{{.Version}}' github.com/influxdata/flux)
revision=$(printf "%s" "${version}" | cut -d- -f 3)
if [[ ${revision} != "" ]]; then
printf "%s\n" "${revision}"
else
printf "%s\n" "${version}"
fi
}

download_flux_archive() {
local revision
revision=$(determine_flux_revision)
log "Downloading flux archive (${revision})..."
curl -sLo flux.zip "https://github.com/influxdata/flux/archive/${revision}.zip"
}

build_test_harness() {
log "Building test harness..."
"$GO" build -o fluxtest ./internal/cmd/fluxtest-harness-influxdb
}

run_integration_tests() {
log "Running flux integration tests..."
./fluxtest -v -p flux.zip
log "Running influxdb integration tests..."
./fluxtest -v -p flux/stdlib
}

cleanup() {
rm -f flux.zip fluxtest
}

main() {
build_test_harness
download_flux_archive
run_integration_tests
cleanup
}
main
6 changes: 2 additions & 4 deletions flux/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/testing"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/flux/builtin"
"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,15 +72,13 @@ func (p *program) Start(ctx context.Context, allocator *memory.Allocator) (flux.
}

func NewController(mc MetaClient, reader influxdb.Reader, auth Authorizer, authEnabled bool, writer influxdb.PointsWriter, logger *zap.Logger) *Controller {
builtin.Initialize()

storageDeps, err := influxdb.NewDependencies(mc, reader, auth, authEnabled, writer)
if err != nil {
panic(err)
}

return &Controller{
deps: []flux.Dependency{storageDeps},
deps: append([]flux.Dependency{storageDeps}, testing.FrameworkConfig{}),
logger: logger,
}
}
Expand Down
15 changes: 6 additions & 9 deletions flux/builtin/builtin.go → flux/init/init.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
// Package builtin ensures all packages related to Flux built-ins are imported and initialized.
// Package init ensures all packages related to Flux built-ins are imported and initialized.
// This should only be imported from main or test packages.
// It is a mistake to import it from any other package.
package builtin
//
// NOTE: This is a superset-wrapper of Flux's built-in initialization logic.
// It also ensures V1-specific flux builtins are initialized.
package init

import (
"sync"

"github.com/influxdata/flux/runtime"
_ "github.com/influxdata/flux/stdlib"
_ "github.com/influxdata/influxdb/flux/stdlib"
)

var once sync.Once

// Initialize ensures all Flux builtins are configured and should be called
// prior to using the Flux runtime. Initialize is safe to call concurrently
// and is idempotent.
func Initialize() {
once.Do(func() {
runtime.FinalizeBuiltIns()
})
runtime.FinalizeBuiltIns()
}
9 changes: 9 additions & 0 deletions flux/init/static/static.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// The init/static package can be imported in test cases and other uses
// cases where it is okay to always initialize flux.
package static

import fluxinit "github.com/influxdata/influxdb/flux/init"

func init() {
fluxinit.Initialize()
}
41 changes: 41 additions & 0 deletions flux/stdlib/influxdata/influxdb/filter_test.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package influxdb_test

import "csv"
import "testing"
import "testing/expect"

option now = () => (2030-01-01T00:00:00Z)

input = "#datatype,string,long,dateTime:RFC3339,string,string,string,double
#group,false,false,false,true,true,true,false
#default,_result,,,,,,
,result,table,_time,_measurement,host,_field,_value
,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83
,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.63
,,1,2018-05-22T19:53:26Z,system,host.local,load3,1.72
,,2,2018-05-22T19:53:26Z,system,host.local,load4,1.77
,,2,2018-05-22T19:53:36Z,system,host.local,load4,1.78
,,2,2018-05-22T19:53:46Z,system,host.local,load4,1.77
"

testcase filter {
expect.planner(rules: [
"influxdata/influxdb.FromStorageRule": 1,
"PushDownRangeRule": 1,
"PushDownFilterRule": 1,
])

want = csv.from(csv: "#datatype,string,long,dateTime:RFC3339,string,string,string,double
#group,false,false,false,true,true,true,false
#default,_result,,,,,,
,result,table,_time,_measurement,host,_field,_value
,,0,2018-05-22T19:53:26Z,system,host.local,load1,1.83
,,0,2018-05-22T19:53:36Z,system,host.local,load1,1.63
")

got = testing.loadStorage(csv: input)
|> range(start: -100y)
|> filter(fn: (r) => r._measurement == "system" and r._field == "load1")
|> drop(columns: ["_start", "_stop"])
testing.diff(want, got)
}
93 changes: 14 additions & 79 deletions flux/stdlib/influxdata/influxdb/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,97 +6,32 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
)

const FromKind = "influxDBFrom"

type FromOpSpec struct {
Bucket string `json:"bucket,omitempty"`
BucketID string `json:"bucketID,omitempty"`
}

func init() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was performing the same rewrite logic as the new FromStorageRule pushdown. I thought it'd be good to refactor into a 1:1 match with the 2.x code, so future back-/forward-ports have fewer differences to think through.

fromSignature := runtime.MustLookupBuiltinType("influxdata/influxdb", "from")
runtime.ReplacePackageValue("influxdata/influxdb", influxdb.FromKind, flux.MustValue(flux.FunctionValue(FromKind, createFromOpSpec, fromSignature)))
flux.RegisterOpSpec(FromKind, newFromOp)
plan.RegisterProcedureSpec(FromKind, newFromProcedure, FromKind)
}

func createFromOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
spec := new(FromOpSpec)

if bucket, ok, err := args.GetString("bucket"); err != nil {
return nil, err
} else if ok {
spec.Bucket = bucket
}

if bucketID, ok, err := args.GetString("bucketID"); err != nil {
return nil, err
} else if ok {
spec.BucketID = bucketID
}

if spec.Bucket == "" && spec.BucketID == "" {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "must specify one of bucket or bucketID",
}
}
if spec.Bucket != "" && spec.BucketID != "" {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "must specify only one of bucket or bucketID",
}
}
return spec, nil
}

func newFromOp() flux.OperationSpec {
return new(FromOpSpec)
}

func (s *FromOpSpec) Kind() flux.OperationKind {
return FromKind
}

type FromProcedureSpec struct {
Bucket string
BucketID string
}

func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*FromOpSpec)
if !ok {
return nil, &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("invalid spec type %T", qs),
}
}
type (
NameOrID = influxdb.NameOrID
FromOpSpec = influxdb.FromOpSpec
)

return &FromProcedureSpec{
Bucket: spec.Bucket,
BucketID: spec.BucketID,
}, nil
type FromStorageProcedureSpec struct {
Bucket influxdb.NameOrID
}

func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
func (s *FromStorageProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}

func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromProcedureSpec)

func (s *FromStorageProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromStorageProcedureSpec)
ns.Bucket = s.Bucket
ns.BucketID = s.BucketID

return ns
}

func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// FromProcedureSpec is a logical operation representing any read
func (s *FromStorageProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// FromStorageProcedureSpec is a logical operation representing any read
// from storage. However as a logical operation, it doesn't specify
// how data is to be read from storage. It is the query planner's
// job to determine the optimal read strategy and to convert this
Expand All @@ -108,10 +43,10 @@ func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// not support unbounded reads, and so this query must not be
// validated.
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
if len(s.Bucket.Name) > 0 {
bucket = s.Bucket.Name
} else {
bucket = s.BucketID
bucket = s.Bucket.ID
}
return &flux.Error{
Code: codes.Invalid,
Expand Down
43 changes: 43 additions & 0 deletions flux/stdlib/influxdata/influxdb/from_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,44 @@
package influxdb_test

import (
"context"
"testing"

"github.com/influxdata/flux"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/stdlib/universe"
qinfluxdb "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
"github.com/stretchr/testify/require"
)

func TestFromValidation(t *testing.T) {
spec := plantest.PlanSpec{
// from |> group (cannot query an infinite time range)
Nodes: []plan.Node{
plan.CreateLogicalNode("from", &influxdb.FromProcedureSpec{
Bucket: influxdb.NameOrID{Name: "my-bucket"},
}),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement", "_field"},
}),
},
Edges: [][2]int{
{0, 1},
},
}

ps := plantest.CreatePlanSpec(&spec)
pp := plan.NewPhysicalPlanner(plan.OnlyPhysicalRules(
qinfluxdb.FromStorageRule{},
qinfluxdb.PushDownRangeRule{},
qinfluxdb.PushDownFilterRule{},
qinfluxdb.PushDownGroupRule{},
))
_, err := pp.Plan(context.Background(), ps)
require.Error(t, err, "Expected query with no call to range to fail physical planning")
want := `cannot submit unbounded read to "my-bucket"; try bounding 'from' with a call to 'range'`
require.Equal(t, want, err.Error())
}
Loading