diff --git a/cmd/influxd/launcher/query_test.go b/cmd/influxd/launcher/query_test.go index d38848b5044..5722bdd219d 100644 --- a/cmd/influxd/launcher/query_test.go +++ b/cmd/influxd/launcher/query_test.go @@ -21,6 +21,7 @@ import ( "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/execute/table" "github.com/influxdata/flux/lang" + "github.com/influxdata/flux/memory" "github.com/influxdata/flux/runtime" "github.com/influxdata/flux/values" "github.com/influxdata/influxdb/v2" @@ -752,6 +753,193 @@ from(bucket: "%s") } } +type TestQueryProfiler struct{ + start int64 +} + +func (s TestQueryProfiler) Name() string { + return fmt.Sprintf("query%d", s.start) +} + +func (s TestQueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) { + groupKey := execute.NewGroupKey( + []flux.ColMeta{ + { + Label: "_measurement", + Type: flux.TString, + }, + }, + []values.Value{ + values.NewString(fmt.Sprintf("profiler/query%d", s.start)), + }, + ) + b := execute.NewColListTableBuilder(groupKey, alloc) + colMeta := []flux.ColMeta{ + { + Label: "_measurement", + Type: flux.TString, + }, + { + Label: "TotalDuration", + Type: flux.TInt, + }, + { + Label: "CompileDuration", + Type: flux.TInt, + }, + { + Label: "QueueDuration", + Type: flux.TInt, + }, + { + Label: "PlanDuration", + Type: flux.TInt, + }, + { + Label: "RequeueDuration", + Type: flux.TInt, + }, + { + Label: "ExecuteDuration", + Type: flux.TInt, + }, + { + Label: "Concurrency", + Type: flux.TInt, + }, + { + Label: "MaxAllocated", + Type: flux.TInt, + }, + { + Label: "TotalAllocated", + Type: flux.TInt, + }, + { + Label: "RuntimeErrors", + Type: flux.TString, + }, + { + Label: "influxdb/scanned-bytes", + Type: flux.TInt, + }, + { + Label: "influxdb/scanned-values", + Type: flux.TInt, + }, + { + Label: "flux/query-plan", + Type: flux.TString, + }, + } + colData := []interface{} { + fmt.Sprintf("profiler/query%d", s.start), + s.start, + s.start + 1, + s.start + 2, + s.start + 3, + s.start + 4, + s.start + 5, + s.start + 6, + s.start + 7, + s.start + 8, + "error1\nerror2", + s.start + 9, + s.start + 10, + "query plan", + } + for _, col := range colMeta { + if _, err := b.AddCol(col); err != nil { + return nil, err + } + } + for i := 0; i < len(colData); i++ { + if intValue, ok := colData[i].(int64); ok { + b.AppendInt(i, intValue) + } else { + b.AppendString(i, colData[i].(string)) + } + } + tbl, err := b.Table() + if err != nil { + return nil, err + } + return tbl, nil +} + +func TestFluxProfiler(t *testing.T) { + testcases := []struct { + name string + data []string + query string + want string + }{ + { + name: "range last single point start time", + data: []string{ + "m,tag=a f=1i 1", + }, + query: ` +option profiler.enabledProfilers = ["query0", "query100", "query100", "NonExistentProfiler"] +from(bucket: v.bucket) + |> range(start: 1970-01-01T00:00:00.000000001Z, stop: 1970-01-01T01:00:00Z) + |> last() +`, + want: ` +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string +#group,false,false,true,true,false,false,true,true,true +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,tag +,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a + +#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long +#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false +#default,_profiler,,,,,,,,,,,,,,, +,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values +,,0,profiler/query0,0,1,2,3,4,5,6,7,8,"error1 +error2","query plan",9,10 +,,1,profiler/query100,100,101,102,103,104,105,106,107,108,"error1 +error2","query plan",109,110 +`, + }, + } + execute.RegisterProfilers(&TestQueryProfiler{}, &TestQueryProfiler{start: 100}) + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + l := launcher.RunTestLauncherOrFail(t, ctx, nil) + + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + + l.WritePointsOrFail(t, strings.Join(tc.data, "\n")) + + queryStr := "import \"profiler\"\nv = {bucket: " + "\"" + l.Bucket.Name + "\"" + "}\n" + tc.query + req := &query.Request{ + Authorization: l.Auth, + OrganizationID: l.Org.ID, + Compiler: lang.FluxCompiler{ + Query: queryStr, + }, + } + if got, err := l.FluxQueryService().Query(ctx, req); err != nil { + t.Error(err) + } else { + dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{}) + want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tc.want))) + if err != nil { + t.Fatal(err) + } + defer want.Release() + + if err := executetest.EqualResultIterators(want, got); err != nil { + t.Fatal(err) + } + } + }) + } +} + func TestQueryPushDowns(t *testing.T) { testcases := []struct { name string diff --git a/query/bridges.go b/query/bridges.go index 9d3c429b75f..564f5546fca 100644 --- a/query/bridges.go +++ b/query/bridges.go @@ -149,6 +149,15 @@ func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, re if err != nil { return stats, tracing.LogError(span, err) } + + if results, err := q.ProfilerResults(); err != nil { + return stats, tracing.LogError(span, err) + } else if results != nil { + _, err = encoder.Encode(w, results) + if err != nil { + return stats, tracing.LogError(span, err) + } + } return stats, nil } diff --git a/query/control/controller.go b/query/control/controller.go index f6819b95f63..503992a0b3d 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -26,6 +26,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/codes" + "github.com/influxdata/flux/execute/table" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/runtime" @@ -548,6 +549,23 @@ type Query struct { alloc *memory.Allocator } +func (q *Query) ProfilerResults() (flux.ResultIterator, error) { + p := q.program.(*lang.AstProgram) + if len(p.Profilers) == 0 { + return nil, nil + } + tables := make([]flux.Table, 0) + for _, profiler := range p.Profilers { + if result, err := profiler.GetResult(q, q.alloc); err != nil { + return nil, err + } else { + tables = append(tables, result) + } + } + res := table.NewProfilerResult(tables...) + return flux.NewSliceResultIterator([]flux.Result{&res}), nil +} + // ID reports an ephemeral unique ID for the query. func (q *Query) ID() QueryID { return q.id @@ -572,10 +590,6 @@ func (q *Query) Results() <-chan flux.Result { return q.results } -func (q *Query) ProfilerResults() (flux.ResultIterator, error) { - return nil, nil -} - func (q *Query) recordUnusedMemory() { unused := q.c.GetUnusedMemoryBytes() q.c.metrics.memoryUnused.WithLabelValues(q.labelValues...).Set(float64(unused))