@@ -21,6 +21,7 @@ import (
21
21
"github.com/influxdata/flux/execute/executetest"
22
22
"github.com/influxdata/flux/execute/table"
23
23
"github.com/influxdata/flux/lang"
24
+ "github.com/influxdata/flux/memory"
24
25
"github.com/influxdata/flux/runtime"
25
26
"github.com/influxdata/flux/values"
26
27
"github.com/influxdata/influxdb/v2"
@@ -752,6 +753,128 @@ from(bucket: "%s")
752
753
}
753
754
}
754
755
756
+ type FluxStatisticsTestProfiler struct {}
757
+
758
+ func (s FluxStatisticsTestProfiler ) Name () string {
759
+ return "TestFluxStatistics"
760
+ }
761
+
762
+ func (s FluxStatisticsTestProfiler ) GetResult (q flux.Query , alloc * memory.Allocator ) (flux.Table , error ) {
763
+ groupKey := execute .NewGroupKey (
764
+ []flux.ColMeta {
765
+ {
766
+ Label : "_measurement" ,
767
+ Type : flux .TString ,
768
+ },
769
+ },
770
+ []values.Value {
771
+ values .NewString ("profiler/FluxStatistics" ),
772
+ },
773
+ )
774
+ b := execute .NewColListTableBuilder (groupKey , alloc )
775
+ for _ , colName := range []string {"_measurement" , "_field" , execute .DefaultValueColLabel } {
776
+ if _ , err := b .AddCol (flux.ColMeta {
777
+ Label : colName ,
778
+ Type : flux .TString ,
779
+ }); err != nil {
780
+ return nil , err
781
+ }
782
+ }
783
+ q .Statistics ().Range (func (key string , value string ) {
784
+ b .AppendString (0 , "profiler/FluxStatistics" )
785
+ b .AppendString (1 , key )
786
+ b .AppendString (2 , key )
787
+ })
788
+ b .Sort ([]string {"_field" }, false )
789
+ tbl , err := b .Table ()
790
+ if err != nil {
791
+ return nil , err
792
+ }
793
+ return tbl , nil
794
+ }
795
+
796
+ func TestFluxProfiler (t * testing.T ) {
797
+ testcases := []struct {
798
+ name string
799
+ data []string
800
+ query string
801
+ want string
802
+ }{
803
+ {
804
+ name : "range last single point start time" ,
805
+ data : []string {
806
+ "m,tag=a f=1i 1" ,
807
+ },
808
+ query : `
809
+ option profiler.enabledProfilers = ["TestFluxStatistics", "TestFluxStatistics", "NonExistent"]
810
+ from(bucket: v.bucket)
811
+ |> range(start: 1970-01-01T00:00:00.000000001Z, stop: 1970-01-01T01:00:00Z)
812
+ |> last()
813
+ ` ,
814
+ want : `
815
+ #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
816
+ #group,false,false,true,true,false,false,true,true,true
817
+ #default,_result,,,,,,,,
818
+ ,result,table,_start,_stop,_time,_value,_field,_measurement,tag
819
+ ,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
820
+
821
+ #datatype,string,long,string,string,string
822
+ #group,false,false,true,false,false
823
+ #default,_profiler,,,,
824
+ ,result,table,_measurement,_field,_value
825
+ ,,0,profiler/FluxStatistics,CompileDuration,CompileDuration
826
+ ,,0,profiler/FluxStatistics,Concurrency,Concurrency
827
+ ,,0,profiler/FluxStatistics,ExecuteDuration,ExecuteDuration
828
+ ,,0,profiler/FluxStatistics,MaxAllocated,MaxAllocated
829
+ ,,0,profiler/FluxStatistics,PlanDuration,PlanDuration
830
+ ,,0,profiler/FluxStatistics,QueueDuration,QueueDuration
831
+ ,,0,profiler/FluxStatistics,RequeueDuration,RequeueDuration
832
+ ,,0,profiler/FluxStatistics,RuntimeErrors,RuntimeErrors
833
+ ,,0,profiler/FluxStatistics,TotalAllocated,TotalAllocated
834
+ ,,0,profiler/FluxStatistics,TotalDuration,TotalDuration
835
+ ,,0,profiler/FluxStatistics,flux/query-plan,flux/query-plan
836
+ ,,0,profiler/FluxStatistics,influxdb/scanned-bytes,influxdb/scanned-bytes
837
+ ,,0,profiler/FluxStatistics,influxdb/scanned-values,influxdb/scanned-values
838
+ ` ,
839
+ },
840
+ }
841
+ execute .RegisterProfilers (& FluxStatisticsTestProfiler {})
842
+ for _ , tc := range testcases {
843
+ tc := tc
844
+ t .Run (tc .name , func (t * testing.T ) {
845
+ l := launcher .RunTestLauncherOrFail (t , ctx , nil )
846
+
847
+ l .SetupOrFail (t )
848
+ defer l .ShutdownOrFail (t , ctx )
849
+
850
+ l .WritePointsOrFail (t , strings .Join (tc .data , "\n " ))
851
+
852
+ queryStr := "import \" profiler\" \n v = {bucket: " + "\" " + l .Bucket .Name + "\" " + "}\n " + tc .query
853
+ req := & query.Request {
854
+ Authorization : l .Auth ,
855
+ OrganizationID : l .Org .ID ,
856
+ Compiler : lang.FluxCompiler {
857
+ Query : queryStr ,
858
+ },
859
+ }
860
+ if got , err := l .FluxQueryService ().Query (ctx , req ); err != nil {
861
+ t .Error (err )
862
+ } else {
863
+ dec := csv .NewMultiResultDecoder (csv.ResultDecoderConfig {})
864
+ want , err := dec .Decode (ioutil .NopCloser (strings .NewReader (tc .want )))
865
+ if err != nil {
866
+ t .Fatal (err )
867
+ }
868
+ defer want .Release ()
869
+
870
+ if err := executetest .EqualResultIterators (want , got ); err != nil {
871
+ t .Fatal (err )
872
+ }
873
+ }
874
+ })
875
+ }
876
+ }
877
+
755
878
func TestQueryPushDowns (t * testing.T ) {
756
879
testcases := []struct {
757
880
name string
0 commit comments