Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make sure the query plan nodes have unique ids #3297

Merged
merged 5 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions execute/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,25 @@ func (s *QueryProfiler) getTableBuilder(q flux.Query, alloc *memory.Allocator) (
stats.TotalAllocated,
strings.Join(stats.RuntimeErrors, "\n"),
}
stats.Metadata.Range(func(key string, value interface{}) bool {
for key, values := range stats.Metadata {
var ty flux.ColType
if intValue, ok := value.(int); ok {
if intValue, ok := values[0].(int); ok {
ty = flux.TInt
colData = append(colData, int64(intValue))
} else {
ty = flux.TString
colData = append(colData, fmt.Sprintf("%v", value))
var data string
for _, value := range values {
valueStr := fmt.Sprintf("%v", value)
data += valueStr + "\n"
}
colData = append(colData, data)
}
colMeta = append(colMeta, flux.ColMeta{
Label: key,
Type: ty,
})
return true
})
}
for _, col := range colMeta {
if _, err := b.AddCol(col); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion execute/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func TestQueryProfiler_GetResult(t *testing.T) {
#default,_profiler,,,,,,,,,,,,,,,
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
,,0,profiler/query,1,2,3,4,5,6,7,8,9,"1
2","query plan",10,11
2","query plan
",10,11
`
q.Done()
tbl, err := p.GetResult(q, &memory.Allocator{})
Expand Down
15 changes: 11 additions & 4 deletions internal/spec/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/plan"
"github.com/opentracing/opentracing-go"
)

type ider struct {
id int
id *int
lookup map[*flux.TableObject]flux.OperationID
}

func (i *ider) nextID() int {
next := i.id
i.id++
next := *i.id
*i.id++
return next
}

Expand All @@ -44,8 +45,14 @@ func (i *ider) ID(t *flux.TableObject) flux.OperationID {
}

func FromEvaluation(ctx context.Context, ses []interpreter.SideEffect, now time.Time) (*flux.Spec, error) {
var nextNodeID *int
if value := ctx.Value(plan.NextPlanNodeIDKey); value != nil {
nextNodeID = value.(*int)
} else {
nextNodeID = new(int)
}
ider := &ider{
id: 0,
id: nextNodeID,
lookup: make(map[*flux.TableObject]flux.OperationID),
}

Expand Down
2 changes: 2 additions & 0 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q
// function calls during the evaluation phase (see `tableFind`).
deps := execute.NewExecutionDependencies(alloc, &p.Now, p.Logger)
ctx = deps.Inject(ctx)
nextPlanNodeID := new(int)
ctx = context.WithValue(ctx, plan.NextPlanNodeIDKey, nextPlanNodeID)

// Evaluation.
sp, scope, err := p.getSpec(ctx, alloc)
Expand Down
142 changes: 142 additions & 0 deletions lang/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package lang_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
_ "github.com/influxdata/flux/builtin"
"github.com/influxdata/flux/execute/executetest"
Expand Down Expand Up @@ -167,3 +169,143 @@ csv.from(csv: data) |> map(fn: (r) => r.nonexistent)`
t.Fatalf("unexpected error from query execution: %s", q.Err())
}
}

// This test verifies that when a query involves table functions or chain(), the plan nodes
// the main query generates does not reuse the node IDs that are already used by the table
// functions or chain()
func TestPlanNodeUniqueness(t *testing.T) {
prelude := `
import "experimental/array"
import "experimental"

data = array.from(rows: [{
_measurement: "command",
_field: "id",
_time: 2018-12-19T22:13:30Z,
_value: 12,
}, {
_measurement: "command",
_field: "id",
_time: 2018-12-19T22:13:40Z,
_value: 23,
}, {
_measurement: "command",
_field: "id",
_time: 2018-12-19T22:13:50Z,
_value: 34,
}, {
_measurement: "command",
_field: "guild",
_time: 2018-12-19T22:13:30Z,
_value: 12,
}, {
_measurement: "command",
_field: "guild",
_time: 2018-12-19T22:13:40Z,
_value: 23,
}, {
_measurement: "command",
_field: "guild",
_time: 2018-12-19T22:13:50Z,
_value: 34,
}])
`
tcs := []struct {
name string
script string
want string
}{
{
name: "chain",
script: `
id = data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == "id")

guild = data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == "guild")

experimental.chain(first: id, second: guild)
`,
want: `[digraph {
experimental/array.from0
range1
filter2
// r._field == "id"
generated_yield

experimental/array.from0 -> range1
range1 -> filter2
filter2 -> generated_yield
}
digraph {
experimental/array.from3
range4
filter5
// r._field == "guild"
generated_yield

experimental/array.from3 -> range4
range4 -> filter5
filter5 -> generated_yield
}
]`,
},
{
name: "tableFns",
script: `
ids = data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == "id")
|> sort()
|> tableFind(fn: (key) => true)
|> getColumn(column: "_field")

id = ids[0]

data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == id)
`,
want: `[digraph {
experimental/array.from0
range1
filter2
// r._field == "id"
sort3
generated_yield

experimental/array.from0 -> range1
range1 -> filter2
filter2 -> sort3
sort3 -> generated_yield
}
digraph {
experimental/array.from4
range5
filter6
// r._field == "id"
generated_yield

experimental/array.from4 -> range5
range5 -> filter6
filter6 -> generated_yield
}
]`,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if q, err := runQuery(prelude + tc.script); err != nil {
t.Error(err)
} else {
got := fmt.Sprintf("%v", q.Statistics().Metadata["flux/query-plan"])
if !cmp.Equal(tc.want, got) {
t.Errorf("unexpected value -want/+got\n%s", cmp.Diff(tc.want, got))
}
}
})
}
}
1 change: 1 addition & 0 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/join_agg_test.flux": "8ab6a33469a50645e41deaaa1f87c1e4c4180b5d79dd87b03d6b4b1012f8ade9",
"stdlib/universe/join_missing_on_col_test.flux": "98f4ca3999b1379d3a35f836449232e3b664fe312e5485179be57e4cc64e6ef4",
"stdlib/universe/join_test.flux": "bdbbc60918fb9b683d9975816a9a9c59e2d7d1436847696b01414d740beffec3",
"stdlib/universe/join_two_same_sources_test.flux": "0d598a25c72d00ea9b06b9bfbdf8ff790e8b6ffd3aacd267a483438727c0f9c5",
"stdlib/universe/join_use_previous_test.flux": "81fcaa31ff9a9a2a06a35a9275daf73cef0434061e8e81cb8317ccae172f7378",
"stdlib/universe/kama_test.flux": "c84bfe6689f42f8bba75b9e06a4b1cb8e441615266ad0d12201d9fff27e34994",
"stdlib/universe/kama_v2_test.flux": "f9d073089fdfd51c2260167d5e30b05de67ee3e4d91bc5e4261ed1ca722dfbc6",
Expand Down
12 changes: 12 additions & 0 deletions plan/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ func CreatePhysicalNode(id NodeID, spec PhysicalProcedureSpec) *PhysicalPlanNode
}
}

const NextPlanNodeIDKey = "NextPlanNodeID"

func CreateUniquePhysicalNode(ctx context.Context, prefix string, spec PhysicalProcedureSpec) *PhysicalPlanNode {
if value := ctx.Value(NextPlanNodeIDKey); value != nil {
nextNodeID := value.(*int)
id := NodeID(fmt.Sprintf("%s%d", prefix, *nextNodeID))
*nextNodeID++
return CreatePhysicalNode(id, spec)
}
return CreatePhysicalNode(NodeID(prefix), spec)
}

// PostPhysicalValidator provides an interface that can be implemented by PhysicalProcedureSpecs for any
// validation checks to be performed post-physical planning.
type PostPhysicalValidator interface {
Expand Down
4 changes: 2 additions & 2 deletions stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p FromRemoteRule) Rewrite(ctx context.Context, node plan.Node) (plan.Node,
config.Token = *spec.Token
}

return plan.CreatePhysicalNode("fromRemote", &FromRemoteProcedureSpec{
return plan.CreateUniquePhysicalNode(ctx, "fromRemote", &FromRemoteProcedureSpec{
Config: config,
}), true, nil
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (p BucketsRemoteRule) Rewrite(ctx context.Context, node plan.Node) (plan.No
return node, false, nil
}

return plan.CreatePhysicalNode("bucketsRemote", &BucketsRemoteProcedureSpec{
return plan.CreateUniquePhysicalNode(ctx, "bucketsRemote", &BucketsRemoteProcedureSpec{
BucketsProcedureSpec: spec,
}), true, nil
}
Expand Down
77 changes: 77 additions & 0 deletions stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,83 @@ import (
"github.com/influxdata/flux/values/valuestest"
)

func TestRuleCreatedNodeUniqueness(t *testing.T) {
nextPlanNodeID := 3
ctx := context.WithValue(context.Background(), plan.NextPlanNodeIDKey, &nextPlanNodeID)
host, token := "localhost", "token"
bucketsProcedureSpec := &influxdb.BucketsProcedureSpec{
Org: &influxdb.NameOrID{Name: "influxdata"},
Host: &host,
Token: &token,
}
joinSpec := &universe.MergeJoinProcedureSpec{
TableNames: []string{"a", "b"},
On: []string{"_value"},
}
fromSpec := &influxdb.FromProcedureSpec{
Bucket: influxdb.NameOrID{Name: "my-bucket"},
Host: &host,
}
fromRemoteSpec := &influxdb.FromRemoteProcedureSpec{
Config: influxdb.Config{Bucket: influxdb.NameOrID{Name: "my-bucket"}, Host: "localhost"},
}
joinEdges := [][2]int{{0, 2}, {1, 2}}
tcs := []plantest.RuleTestCase{
{
Name: "BucketsRemoteJoin",
Context: ctx,
Rules: []plan.Rule{influxdb.BucketsRemoteRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("buckets0", bucketsProcedureSpec),
plan.CreateLogicalNode("buckets1", bucketsProcedureSpec),
plan.CreateLogicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("bucketsRemote3", &influxdb.BucketsRemoteProcedureSpec{
BucketsProcedureSpec: bucketsProcedureSpec,
}),
plan.CreatePhysicalNode("bucketsRemote4", &influxdb.BucketsRemoteProcedureSpec{
BucketsProcedureSpec: bucketsProcedureSpec,
}),
plan.CreatePhysicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
},
{
Name: "FromRemoteTableJoin",
Context: ctx,
Rules: []plan.Rule{influxdb.FromRemoteRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("from0", fromSpec),
plan.CreateLogicalNode("from1", fromSpec),
plan.CreateLogicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("fromRemote5", fromRemoteSpec),
plan.CreatePhysicalNode("fromRemote6", fromRemoteSpec),
plan.CreatePhysicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
},
}

for _, tc := range tcs {
t.Run(tc.Name, func(t *testing.T) {
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}

func TestFromRemoteRule_WithHost(t *testing.T) {
fromSpec := influxdb.FromProcedureSpec{
Org: &influxdb.NameOrID{Name: "influxdata"},
Expand Down
2 changes: 1 addition & 1 deletion stdlib/influxdata/influxdb/v1/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (p DatabasesRemoteRule) Rewrite(ctx context.Context, node plan.Node) (plan.
return node, false, nil
}

return plan.CreatePhysicalNode("databasesRemote", &DatabasesRemoteProcedureSpec{
return plan.CreateUniquePhysicalNode(ctx, "databasesRemote", &DatabasesRemoteProcedureSpec{
DatabasesProcedureSpec: spec,
}), true, nil
}
Loading