Skip to content

Commit

Permalink
feat: Add index join conditions (#12461)
Browse files Browse the repository at this point in the history
Summary:

Change index join condition from SQL expression to a more strict condition structure
to present in query plan node

Differential Revision: D70233337
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Feb 27, 2025
1 parent 223777e commit 7507a1d
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 94 deletions.
10 changes: 7 additions & 3 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ namespace facebook::velox::core {
class ITypedExpr;
}

namespace facebook::velox::core {
struct IndexJoinCondition;
}

namespace facebook::velox::connector {

class DataSource;
Expand Down Expand Up @@ -584,8 +588,8 @@ class Connector {
/// Here,
/// - 'inputType' is ROW{t.sid, t.event_list}
/// - 'numJoinKeys' is 1 since only t.sid is used in join equi-clauses.
/// - 'joinConditions' is list of one expression: contains(t.event_list,
/// u.event_type)
/// - 'joinConditions' specifies the join condition: contains(t.event_list,
/// u.event_type)
/// - 'outputType' is ROW{u.event_value}
/// - 'tableHandle' specifies the metadata of the index table.
/// - 'columnHandles' is a map from 'u.event_type' (in 'joinConditions') and
Expand All @@ -596,7 +600,7 @@ class Connector {
virtual std::shared_ptr<IndexSource> createIndexSource(
const RowTypePtr& inputType,
size_t numJoinKeys,
const std::vector<std::shared_ptr<const core::ITypedExpr>>&
const std::vector<std::shared_ptr<core::IndexJoinCondition>>&
joinConditions,
const RowTypePtr& outputType,
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
Expand Down
78 changes: 73 additions & 5 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,35 @@ std::vector<PlanNodePtr> deserializeSources(
return {};
}

std::vector<TypedExprPtr> deserializeJoinConditions(
namespace {
IndexJoinConditionPtr createIndexJoinCondition(
const folly::dynamic& obj,
void* context) {
VELOX_USER_CHECK_EQ(obj.count("type"), 1);
if (obj["type"] == "in") {
return InIndexJoinCondition::create(obj, context);
}
if (obj["type"] == "between") {
return BetweenIndexJoinCondition::create(obj, context);
}
VELOX_USER_FAIL(
"Unknown index join condition type {}", obj["type"].asString());
}
} // namespace

std::vector<IndexJoinConditionPtr> deserializeJoinConditions(
const folly::dynamic& obj,
void* context) {
if (obj.count("joinConditions") == 0) {
return {};
}

return ISerializable::deserialize<std::vector<ITypedExpr>>(
obj["joinConditions"], context);
std::vector<IndexJoinConditionPtr> joinConditions;
joinConditions.reserve(obj.count("joinConditions"));
for (const auto& joinCondition : obj["joinConditions"]) {
joinConditions.push_back(createIndexJoinCondition(joinCondition, context));
}
return joinConditions;
}

PlanNodePtr deserializeSingleSource(const folly::dynamic& obj, void* context) {
Expand Down Expand Up @@ -1447,8 +1467,7 @@ PlanNodePtr IndexLookupJoinNode::create(
folly::dynamic IndexLookupJoinNode::serialize() const {
auto obj = serializeBase();
if (!joinConditions_.empty()) {
folly::dynamic serializedJoins = folly::dynamic::array;
serializedJoins.reserve(joinConditions_.size());
folly::dynamic serializedJoins = folly::dynamic::array();
for (const auto& joinCondition : joinConditions_) {
serializedJoins.push_back(joinCondition->serialize());
}
Expand Down Expand Up @@ -2881,4 +2900,53 @@ PlanNodePtr FilterNode::create(const folly::dynamic& obj, void* context) {
deserializePlanNodeId(obj), filter, std::move(source));
}

folly::dynamic IndexJoinCondition::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["key"] = key->serialize();
return obj;
}

folly::dynamic InIndexJoinCondition::serialize() const {
folly::dynamic obj = IndexJoinCondition::serialize();
obj["type"] = "in";
obj["in"] = list->serialize();
return obj;
}

std::string InIndexJoinCondition::toString() const {
return fmt::format("{} IN {}", key->toString(), list->toString());
}

IndexJoinConditionPtr InIndexJoinCondition::create(
const folly::dynamic& obj,
void* context) {
return std::make_shared<InIndexJoinCondition>(
ISerializable::deserialize<FieldAccessTypedExpr>(obj["key"], context),
ISerializable::deserialize<FieldAccessTypedExpr>(obj["in"], context));
}

folly::dynamic BetweenIndexJoinCondition::serialize() const {
folly::dynamic obj = IndexJoinCondition::serialize();
obj["type"] = "between";
obj["lower"] = lower->serialize();
obj["upper"] = upper->serialize();
return obj;
}

std::string BetweenIndexJoinCondition::toString() const {
return fmt::format(
"{} BETWEEN {} AND {}",
key->toString(),
lower->toString(),
upper->toString());
}

IndexJoinConditionPtr BetweenIndexJoinCondition::create(
const folly::dynamic& obj,
void* context) {
return std::make_shared<BetweenIndexJoinCondition>(
ISerializable::deserialize<FieldAccessTypedExpr>(obj["key"], context),
ISerializable::deserialize<ITypedExpr>(obj["lower"], context),
ISerializable::deserialize<ITypedExpr>(obj["upper"], context));
}
} // namespace facebook::velox::core
70 changes: 65 additions & 5 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,66 @@ class MergeJoinNode : public AbstractJoinNode {
static PlanNodePtr create(const folly::dynamic& obj, void* context);
};

struct IndexJoinCondition;
using IndexJoinConditionPtr = std::shared_ptr<IndexJoinCondition>;
struct IndexJoinCondition : public ISerializable {
/// References to an index table column.
FieldAccessTypedExprPtr key;

IndexJoinCondition(FieldAccessTypedExprPtr _key) : key(std::move(_key)) {}

folly::dynamic serialize() const override;

virtual std::string toString() const = 0;
};

/// Represents IN-LIST index join condition: contains('in', 'key'). 'list' has
/// type of ARRAY(typeof('key')).
struct InIndexJoinCondition : public IndexJoinCondition {
/// References to the probe input column which is ARRAY with element type of
/// the corresponding 'lookupKey' column from index table.
FieldAccessTypedExprPtr list;

InIndexJoinCondition(
FieldAccessTypedExprPtr _key,
FieldAccessTypedExprPtr _list)
: IndexJoinCondition(std::move(_key)), list(std::move(_list)) {}

folly::dynamic serialize() const override;

std::string toString() const override;

static IndexJoinConditionPtr create(const folly::dynamic& obj, void* context);
};
using InIndexJoinConditionPtr = std::shared_ptr<InIndexJoinCondition>;

/// Represents BETWEEN index join condition: 'key' between 'lower' and 'upper'.
/// 'lower' and 'upper' have the same type of 'key'.
struct BetweenIndexJoinCondition : public IndexJoinCondition {
/// The between bound either reference to a probe input column or a constant
/// value.
///
/// NOTE: the bound is inclusive, and at least one of the bound references to
/// a probe input column.
TypedExprPtr lower;
TypedExprPtr upper;

BetweenIndexJoinCondition(
FieldAccessTypedExprPtr _key,
TypedExprPtr _lower,
TypedExprPtr _upper)
: IndexJoinCondition(std::move(_key)),
lower(std::move(_lower)),
upper(std::move(_upper)) {}

folly::dynamic serialize() const override;

std::string toString() const override;

static IndexJoinConditionPtr create(const folly::dynamic& obj, void* context);
};
using BetweenIndexJoinConditionPtr = std::shared_ptr<BetweenIndexJoinCondition>;

/// Represents index lookup join. Translates to an exec::IndexLookupJoin
/// operator. Assumes the right input is a table scan source node that provides
/// indexed table lookup for the left input with the specified join keys and
Expand Down Expand Up @@ -1796,8 +1856,8 @@ class MergeJoinNode : public AbstractJoinNode {
/// maybe some more)
/// - 'leftKeys' is a list of one key 't.sid'
/// - 'rightKeys' is a list of one key 'u.sid'
/// - 'joinConditions' is a list of one expression: contains(t.event_list,
/// u.event_type)
/// - 'joinConditions' specifies one condition: contains(t.event_list,
/// u.event_type)
/// - 'outputType' contains 3 columns : t.sid, t.day_ts, u.event_type
///
class IndexLookupJoinNode : public AbstractJoinNode {
Expand All @@ -1809,7 +1869,7 @@ class IndexLookupJoinNode : public AbstractJoinNode {
JoinType joinType,
const std::vector<FieldAccessTypedExprPtr>& leftKeys,
const std::vector<FieldAccessTypedExprPtr>& rightKeys,
const std::vector<TypedExprPtr>& joinConditions,
const std::vector<IndexJoinConditionPtr>& joinConditions,
PlanNodePtr left,
TableScanNodePtr right,
RowTypePtr outputType)
Expand Down Expand Up @@ -1849,7 +1909,7 @@ class IndexLookupJoinNode : public AbstractJoinNode {
return lookupSourceNode_;
}

const std::vector<TypedExprPtr>& joinConditions() const {
const std::vector<IndexJoinConditionPtr>& joinConditions() const {
return joinConditions_;
}

Expand All @@ -1869,7 +1929,7 @@ class IndexLookupJoinNode : public AbstractJoinNode {

const TableScanNodePtr lookupSourceNode_;

const std::vector<TypedExprPtr> joinConditions_;
const std::vector<IndexJoinConditionPtr> joinConditions_;
};

/// Represents inner/outer nested loop joins. Translates to an
Expand Down
Loading

0 comments on commit 7507a1d

Please sign in to comment.