Skip to content

Commit 5c330a4

Browse files
ethanyzhangChristopher Wolff
authored and
Christopher Wolff
committed
feat: flux query profiler (#19359)
1 parent 4aaecf9 commit 5c330a4

File tree

1 file changed

+190
-1
lines changed

1 file changed

+190
-1
lines changed

cmd/influxd/launcher/query_test.go

+190-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/influxdata/flux/execute"
2121
"github.com/influxdata/flux/execute/executetest"
2222
"github.com/influxdata/flux/lang"
23+
"github.com/influxdata/flux/memory"
24+
"github.com/influxdata/flux/runtime"
2325
"github.com/influxdata/flux/values"
2426
"github.com/influxdata/influxdb/v2"
2527
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
@@ -221,7 +223,7 @@ func queryPoints(ctx context.Context, t *testing.T, l *launcher.TestLauncher, op
221223
if d.verbose {
222224
t.Logf("query:\n%s", qs)
223225
}
224-
pkg, err := flux.Parse(qs)
226+
pkg, err := runtime.ParseToJSON(qs)
225227
if err != nil {
226228
t.Fatal(err)
227229
}
@@ -751,6 +753,193 @@ from(bucket: "%s")
751753
}
752754
}
753755

756+
type TestQueryProfiler struct{
757+
start int64
758+
}
759+
760+
func (s TestQueryProfiler) Name() string {
761+
return fmt.Sprintf("query%d", s.start)
762+
}
763+
764+
func (s TestQueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) {
765+
groupKey := execute.NewGroupKey(
766+
[]flux.ColMeta{
767+
{
768+
Label: "_measurement",
769+
Type: flux.TString,
770+
},
771+
},
772+
[]values.Value{
773+
values.NewString(fmt.Sprintf("profiler/query%d", s.start)),
774+
},
775+
)
776+
b := execute.NewColListTableBuilder(groupKey, alloc)
777+
colMeta := []flux.ColMeta{
778+
{
779+
Label: "_measurement",
780+
Type: flux.TString,
781+
},
782+
{
783+
Label: "TotalDuration",
784+
Type: flux.TInt,
785+
},
786+
{
787+
Label: "CompileDuration",
788+
Type: flux.TInt,
789+
},
790+
{
791+
Label: "QueueDuration",
792+
Type: flux.TInt,
793+
},
794+
{
795+
Label: "PlanDuration",
796+
Type: flux.TInt,
797+
},
798+
{
799+
Label: "RequeueDuration",
800+
Type: flux.TInt,
801+
},
802+
{
803+
Label: "ExecuteDuration",
804+
Type: flux.TInt,
805+
},
806+
{
807+
Label: "Concurrency",
808+
Type: flux.TInt,
809+
},
810+
{
811+
Label: "MaxAllocated",
812+
Type: flux.TInt,
813+
},
814+
{
815+
Label: "TotalAllocated",
816+
Type: flux.TInt,
817+
},
818+
{
819+
Label: "RuntimeErrors",
820+
Type: flux.TString,
821+
},
822+
{
823+
Label: "influxdb/scanned-bytes",
824+
Type: flux.TInt,
825+
},
826+
{
827+
Label: "influxdb/scanned-values",
828+
Type: flux.TInt,
829+
},
830+
{
831+
Label: "flux/query-plan",
832+
Type: flux.TString,
833+
},
834+
}
835+
colData := []interface{} {
836+
fmt.Sprintf("profiler/query%d", s.start),
837+
s.start,
838+
s.start + 1,
839+
s.start + 2,
840+
s.start + 3,
841+
s.start + 4,
842+
s.start + 5,
843+
s.start + 6,
844+
s.start + 7,
845+
s.start + 8,
846+
"error1\nerror2",
847+
s.start + 9,
848+
s.start + 10,
849+
"query plan",
850+
}
851+
for _, col := range colMeta {
852+
if _, err := b.AddCol(col); err != nil {
853+
return nil, err
854+
}
855+
}
856+
for i := 0; i < len(colData); i++ {
857+
if intValue, ok := colData[i].(int64); ok {
858+
b.AppendInt(i, intValue)
859+
} else {
860+
b.AppendString(i, colData[i].(string))
861+
}
862+
}
863+
tbl, err := b.Table()
864+
if err != nil {
865+
return nil, err
866+
}
867+
return tbl, nil
868+
}
869+
870+
func TestFluxProfiler(t *testing.T) {
871+
testcases := []struct {
872+
name string
873+
data []string
874+
query string
875+
want string
876+
}{
877+
{
878+
name: "range last single point start time",
879+
data: []string{
880+
"m,tag=a f=1i 1",
881+
},
882+
query: `
883+
option profiler.enabledProfilers = ["query0", "query100", "query100", "NonExistentProfiler"]
884+
from(bucket: v.bucket)
885+
|> range(start: 1970-01-01T00:00:00.000000001Z, stop: 1970-01-01T01:00:00Z)
886+
|> last()
887+
`,
888+
want: `
889+
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
890+
#group,false,false,true,true,false,false,true,true,true
891+
#default,_result,,,,,,,,
892+
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
893+
,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
894+
895+
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long
896+
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
897+
#default,_profiler,,,,,,,,,,,,,,,
898+
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
899+
,,0,profiler/query0,0,1,2,3,4,5,6,7,8,"error1
900+
error2","query plan",9,10
901+
,,1,profiler/query100,100,101,102,103,104,105,106,107,108,"error1
902+
error2","query plan",109,110
903+
`,
904+
},
905+
}
906+
execute.RegisterProfilers(&TestQueryProfiler{}, &TestQueryProfiler{start: 100})
907+
for _, tc := range testcases {
908+
tc := tc
909+
t.Run(tc.name, func(t *testing.T) {
910+
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
911+
912+
l.SetupOrFail(t)
913+
defer l.ShutdownOrFail(t, ctx)
914+
915+
l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))
916+
917+
queryStr := "import \"profiler\"\nv = {bucket: " + "\"" + l.Bucket.Name + "\"" + "}\n" + tc.query
918+
req := &query.Request{
919+
Authorization: l.Auth,
920+
OrganizationID: l.Org.ID,
921+
Compiler: lang.FluxCompiler{
922+
Query: queryStr,
923+
},
924+
}
925+
if got, err := l.FluxQueryService().Query(ctx, req); err != nil {
926+
t.Error(err)
927+
} else {
928+
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
929+
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tc.want)))
930+
if err != nil {
931+
t.Fatal(err)
932+
}
933+
defer want.Release()
934+
935+
if err := executetest.EqualResultIterators(want, got); err != nil {
936+
t.Fatal(err)
937+
}
938+
}
939+
})
940+
}
941+
}
942+
754943
func TestQueryPushDowns(t *testing.T) {
755944
t.Skip("Not supported yet")
756945
testcases := []struct {

0 commit comments

Comments
 (0)