-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Add left/right/anti/semi joins to native executor #2743
Conversation
CodSpeed Performance ReportMerging #2743 will degrade performances by 46.77%Comparing Summary
Benchmarks breakdown
|
5ba0d86
to
b1b36cf
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2743 +/- ##
==========================================
+ Coverage 65.27% 65.98% +0.71%
==========================================
Files 1005 1008 +3
Lines 113099 113417 +318
==========================================
+ Hits 73820 74836 +1016
+ Misses 39279 38581 -698
|
pub enum ProbeTable { | ||
WithIdx(ProbeTableWithIdx), | ||
WithoutIdx(ProbeTableWithoutIdx), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samster25 Gonna keep thinking on this but wanted to ask if you have any opinions on how to allow for probe tables with different values? Right now I have this as an enum at the probe table level, but it's not the best as there's now quite a bit of duplicate code, and on the prober side it opens up more room for error as it now has to get the appropriate probe table type based on the join type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm not a fan of this either. Here's what i'm thinking as an alternative.
Since the API for building add_table
will be the same, we can center that into a trait.
fn make_probe_table_builder(schema: SchemaRef, track_indices: bool) -> Box<dyn ProbeTableBuilder> {}
pub trait ProbeTableBuilder {
fn add_table(&mut self, table: &Table) -> DaftResult<()>;
fn build(&mut self) -> Box<dyn Probeable>;
}
// Both methods should work if we are tracking indices but if we disabled it, the later should only work.
pub trait Probeable {
fn probe_indices(&self, table: &Table) -> Iterator<...>;
fn probe_exists(&self, table: &Table) -> Iterator<bool>;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left an initial review with some ideas with how to organize the code better. LMK what you think
pub fn probe<'a>( | ||
&'a self, | ||
right: &'a Table, | ||
) -> DaftResult<impl Iterator<Item = (u64, bool)> + 'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think you need to return the index here since it should map 1:1 with the right input.
num_rows: usize, | ||
} | ||
|
||
impl ProbeTableWithoutIdx { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may make more sense to call it a ProbeSet
rather than ProbeTableWithoutIdx
pub enum ProbeTable { | ||
WithIdx(ProbeTableWithIdx), | ||
WithoutIdx(ProbeTableWithoutIdx), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm not a fan of this either. Here's what i'm thinking as an alternative.
Since the API for building add_table
will be the same, we can center that into a trait.
fn make_probe_table_builder(schema: SchemaRef, track_indices: bool) -> Box<dyn ProbeTableBuilder> {}
pub trait ProbeTableBuilder {
fn add_table(&mut self, table: &Table) -> DaftResult<()>;
fn build(&mut self) -> Box<dyn Probeable>;
}
// Both methods should work if we are tracking indices but if we disabled it, the later should only work.
pub trait Probeable {
fn probe_indices(&self, table: &Table) -> Iterator<...>;
fn probe_exists(&self, table: &Table) -> Iterator<bool>;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Largely, it looks super clean!
I'm wondering why you do not just create a separate intermediate op and sink for the anti and semi hash joins, I think that might actually reduce the amount of code since we would not need the boilerplate for Probeable
. The Probeable
trait doesn't seem to be a great abstraction to me, since the two things that implement it implement different subsets of the methods, and ProbeTable::probe_exists
is unused. Moreover, having separate ops means that we can enforce via types that anti/semi joins take in a ProbeSet and the other joins take in a ProbeTable.
Another comment about that ^ is that the inner and left/right joins actually have some logic for actually creating the final table by taking the appropriate columns from the left and right sides that can be shared better if anti/semi joins are separated out.
let (build_on, probe_on, build_child, probe_child, build_on_left) = match join_type { | ||
JoinType::Inner => (left_on, right_on, left, right, true), | ||
JoinType::Right => (left_on, right_on, left, right, true), | ||
JoinType::Left | JoinType::Anti | JoinType::Semi => { | ||
(right_on, left_on, right, left, false) | ||
} | ||
JoinType::Outer => { | ||
unimplemented!("Outer join not supported yet"); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit but it might be cleaner just to have a match for build_on_left
and then set the build_on
, build_child
, etc based on the build_on_left
value. Also, I would just separate the left join type match from the anti and semis for readability
) -> DaftResult<Arc<MicroPartition>> { | ||
if let HashJoinProbeState::ReadyToProbe(probe_table, tables) = self { | ||
let _growables = info_span!("HashJoinOperator::build_growables").entered(); | ||
|
||
// Left should only be created once per probe table | ||
let mut left_growable = | ||
let mut build_side_growable = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what the performance difference is between using a growable vs storing indices in a vec and then doing table.take(idx)
. I'm not sure why one would be faster than another, just wondering if you knew.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also why are the capacities set to 20 initially? Especially for the left and right joins, where we know at least how large the tables will be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't actually tested the performance, but my theory is that the growable will have a smaller memory foot print than doing a take. Reason being, if we do the take method, we'd actually have to do a concat first, and this could be quite costly, whereas for the growable, we can elide this concat and leave the Vec as is.
We should definitely test this, but, since we already see pretty good performance with the growable method it's probably not a huge priority
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yes you're right we should set the capacity for left/rights
}) | ||
} | ||
|
||
fn probe<'a>(&'a self, right: &'a Table) -> DaftResult<impl Iterator<Item = bool> + 'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename right
to input
to match the join op naming convention
h == other.hash && { | ||
let l_idx = other.idx; | ||
let l_table_idx = (l_idx >> Self::TABLE_IDX_SHIFT) as usize; | ||
let l_row_idx = (l_idx & Self::LOWER_MASK) as usize; | ||
|
||
let l_table = self.tables.get(l_table_idx).unwrap(); | ||
|
||
let left_refs = l_table.0.as_slice(); | ||
|
||
(self.compare_fn)(left_refs, &right_arrays, l_row_idx, r_idx).is_eq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some kind of complicated logic here that's also repeated in add_table
. It would be nice to have a better abstraction over it
JoinType::Inner => state.probe_inner( | ||
input, | ||
&self.probe_on, | ||
&self.common_join_keys, | ||
&self.left_non_join_columns, | ||
&self.right_non_join_columns, | ||
self.build_on_left, | ||
), | ||
JoinType::Left | JoinType::Right => state.probe_left_right( | ||
input, | ||
&self.probe_on, | ||
&self.common_join_keys, | ||
&self.left_non_join_columns, | ||
&self.right_non_join_columns, | ||
self.join_type == JoinType::Left, | ||
), | ||
JoinType::Semi | JoinType::Anti => state.probe_anti_semi( | ||
input, | ||
&self.probe_on, | ||
self.join_type == JoinType::Semi, | ||
), | ||
JoinType::Outer => { | ||
unimplemented!("Outer join is not yet implemented") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not important but why do we have the actual probing logic inside of the state instead of the operator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much of a compelling reason, I think it's safe to move the logic to operator side and keep state logic free.
Makes sense, will separate the anti/semi joins out. On |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the probeable
code and that looks good! Seems like @kevinzwang already reviewed the local-execution which also doesn't look that different than before.
} | ||
|
||
fn probe_indices<'a>(&'a self, _right: &'a Table) -> DaftResult<IndicesMapper<'a>> { | ||
panic!("Not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw a more informative error message!
Also adds some functionality to the benchmarking code for manual runs Blocked by #2743 for compatibility with new executor
Implement left/right/anti/semi joins to native executor.
Left/Right: Build probe table on the opposite side, e.g. for left joins build the probe table on the right side. During the probing phase, if there is no match, add a null row.
Anti/Semi: Build probe table on the right side. During the probing phase, emit for anti if there is no match, vice versa for semi.
Running

DAFT_ENABLE_NATIVE_EXECUTOR=1 pytest tests/dataframe/test_joins.py
with SMJ/broadcast + Outer joins skipped.