Skip to content

Commit

Permalink
Convert pinot query to use unix milliseconds instead of nano (#5650)
Browse files Browse the repository at this point in the history
* Convert pinot query to use unix milliseconds instead of nano
  • Loading branch information
neil-xie authored Feb 9, 2024
1 parent eec8f6d commit 9b15cd1
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
73 changes: 73 additions & 0 deletions common/pinot/pinotQueryValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package pinot
import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/xwb1989/sqlparser"
Expand All @@ -40,6 +41,13 @@ type VisibilityQueryValidator struct {
validSearchAttributes map[string]interface{}
}

var timeSystemKeys = map[string]bool{
"StartTime": true,
"CloseTime": true,
"ExecutionTime": true,
"UpdateTime": true,
}

// NewPinotQueryValidator create VisibilityQueryValidator
func NewPinotQueryValidator(validSearchAttributes map[string]interface{}) *VisibilityQueryValidator {
return &VisibilityQueryValidator{
Expand Down Expand Up @@ -120,6 +128,22 @@ func (qv *VisibilityQueryValidator) validateRangeExpr(expr sqlparser.Expr) (stri
}

if definition.IsSystemIndexedKey(colNameStr) {
if _, ok = timeSystemKeys[colNameStr]; ok {
if lowerBound, ok := rangeCond.From.(*sqlparser.SQLVal); ok {
trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(lowerBound)
if err != nil {
return "", err
}
rangeCond.From = trimmed
}
if upperBound, ok := rangeCond.To.(*sqlparser.SQLVal); ok {
trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(upperBound)
if err != nil {
return "", err
}
rangeCond.To = trimmed
}
}
expr.Format(buf)
return buf.String(), nil
}
Expand Down Expand Up @@ -217,6 +241,18 @@ func (qv *VisibilityQueryValidator) processSystemKey(expr sqlparser.Expr) (strin
colNameStr := colName.Name.String()

if comparisonExpr.Operator != sqlparser.EqualStr {
if _, ok := timeSystemKeys[colNameStr]; ok {
sqlVal, ok := comparisonExpr.Right.(*sqlparser.SQLVal)
if !ok {
return "", fmt.Errorf("error: Failed to convert val")
}
trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(sqlVal)
if err != nil {
return "", err
}
comparisonExpr.Right = trimmed
}

expr.Format(buf)
return buf.String(), nil
}
Expand Down Expand Up @@ -248,6 +284,18 @@ func (qv *VisibilityQueryValidator) processSystemKey(expr sqlparser.Expr) (strin
Name: sqlparser.NewColIdent(newColVal),
Qualifier: colName.Qualifier,
}
} else {
if _, ok := timeSystemKeys[colNameStr]; ok {
sqlVal, ok := comparisonExpr.Right.(*sqlparser.SQLVal)
if !ok {
return "", fmt.Errorf("error: Failed to convert val")
}
trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(sqlVal)
if err != nil {
return "", err
}
comparisonExpr.Right = trimmed
}
}

// For this branch, we still have a sqlExpr type. So need to use a buf to return the string
Expand Down Expand Up @@ -326,3 +374,28 @@ func processCustomString(comparisonExpr *sqlparser.ComparisonExpr, colNameStr st
return fmt.Sprintf("(JSON_MATCH(Attr, '\"$.%s\" is not null') "+
"AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.%s', 'string'), '%s*'))", colNameStr, colNameStr, colValStr)
}

func trimTimeFieldValueFromNanoToMilliSeconds(original *sqlparser.SQLVal) (*sqlparser.SQLVal, error) {
// Convert the SQLVal to a string
valStr := string(original.Val)
// Convert to an integer
valInt, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
return original, fmt.Errorf("error: failed to parse int from SQLVal %s", valStr)
}

var newVal int64
if valInt < 0 { //exclude open workflow which time field will be -1
newVal = valInt
} else if len(valStr) > 13 { // Assuming nanoseconds if more than 13 digits
newVal = valInt / 1000000 // Convert time to milliseconds
} else {
newVal = valInt
}

// Convert the new value back to SQLVal
return &sqlparser.SQLVal{
Type: sqlparser.IntVal,
Val: []byte(strconv.FormatInt(newVal, 10)),
}, nil
}
40 changes: 40 additions & 0 deletions common/pinot/pinotQueryValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,46 @@ func TestValidateQuery(t *testing.T) {
query: "StartTime >= 1697754674",
validated: "StartTime >= 1697754674",
},
"Case15-4: unix nano converts to milli seconds for equal statements": {
query: "StartTime = 1707319950934000128",
validated: "StartTime = 1707319950934",
},
"Case15-5: unix nano converts to milli seconds for unequal statements query": {
query: "StartTime > 1707319950934000128",
validated: "StartTime > 1707319950934",
},
"Case15-6: open workflows": {
query: "CloseTime = -1",
validated: "CloseTime = -1",
},
"Case15-7: startTime for range query": {
query: "StartTime BETWEEN 1707319950934000128 AND 1707319950935000128",
validated: "StartTime between 1707319950934 and 1707319950935",
},
"Case15-8: invalid string for trim": {
query: "CloseTime = abc",
validated: "",
err: "error: failed to convert val",
},
"Case15-9: invalid value for trim": {
query: "CloseTime = 123.45",
validated: "",
err: "error: failed to parse int from SQLVal 123.45",
},
"Case15-10: invalid from time for range query": {
query: "StartTime BETWEEN 17.50 AND 1707319950935000128",
validated: "",
err: "error: failed to parse int from SQLVal 17.50",
},
"Case15-11: invalid to time for range query": {
query: "StartTime BETWEEN 1707319950934000128 AND 1707319950935000128.1",
validated: "",
err: "error: failed to parse int from SQLVal 1707319950935000128.1",
},
"Case15-12: value already in milliseconds": {
query: "StartTime = 170731995093",
validated: "StartTime = 170731995093",
},
"Case16-1: custom int attribute greater than or equal to": {
query: "CustomIntField >= 0",
validated: "(JSON_MATCH(Attr, '\"$.CustomIntField\" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) >= 0)",
Expand Down

0 comments on commit 9b15cd1

Please sign in to comment.