-
Notifications
You must be signed in to change notification settings - Fork 372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix various bugs in split/apply/combine in 0.21 release #2280
Changes from all commits
6281f83
3a99848
36ea388
a2cea21
8e92887
387640c
76d7ce0
8818342
233aa52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,15 +140,17 @@ function groupby(df::AbstractDataFrame, cols; | |
intcols = idxcols isa Int ? [idxcols] : convert(Vector{Int}, idxcols) | ||
if isempty(intcols) | ||
return GroupedDataFrame(df, intcols, ones(Int, nrow(df)), | ||
collect(axes(df, 1)), [1], [nrow(df)], 1, nothing) | ||
collect(axes(df, 1)), [1], [nrow(df)], 1, nothing, | ||
Threads.ReentrantLock()) | ||
end | ||
sdf = df[!, intcols] | ||
|
||
groups = Vector{Int}(undef, nrow(df)) | ||
ngroups, rhashes, gslots, sorted = | ||
row_group_slots(ntuple(i -> sdf[!, i], ncol(sdf)), Val(false), groups, skipmissing) | ||
|
||
gd = GroupedDataFrame(df, intcols, groups, nothing, nothing, nothing, ngroups, nothing) | ||
gd = GroupedDataFrame(df, intcols, groups, nothing, nothing, nothing, ngroups, nothing, | ||
Threads.ReentrantLock()) | ||
|
||
# sort groups if row_group_slots hasn't already done that | ||
if sort && !sorted | ||
|
@@ -237,6 +239,8 @@ const F_ARGUMENT_RULES = | |
If the first or last argument is a function `fun`, it is passed a [`SubDataFrame`](@ref) | ||
view for each group and can return any return value defined below. | ||
Note that this form is slower than `pair` or `args` due to type instability. | ||
|
||
If `gd` has zero groups then no transformations are applied. | ||
""" | ||
|
||
const KWARG_PROCESSING_RULES = | ||
|
@@ -245,9 +249,13 @@ const KWARG_PROCESSING_RULES = | |
in addition to those generated. In this case if the returned | ||
value contains columns with the same names as the grouping columns, they are | ||
required to be equal. | ||
If `keepkeys=false` and some generated columns have the same name as grouping columns, | ||
they are kept and are not required to be equal to grouping columns. | ||
|
||
If `ungroup=true` (the default) a `DataFrame` is returned. | ||
If `ungroup=false` a `GroupedDataFrame` grouped using `keycols(gdf)` is returned. | ||
|
||
If `gd` has zero groups then no transformations are applied. | ||
""" | ||
|
||
""" | ||
|
@@ -471,7 +479,6 @@ function _combine_prepare(gd::GroupedDataFrame, | |
@nospecialize(cs::Union{Pair, typeof(nrow), | ||
ColumnIndex, MultiColumnIndex}...); | ||
keepkeys::Bool, ungroup::Bool, copycols::Bool, keeprows::Bool) | ||
@assert !isempty(cs) | ||
cs_vec = [] | ||
for p in cs | ||
if p === nrow | ||
|
@@ -548,13 +555,12 @@ function _combine_prepare(gd::GroupedDataFrame, | |
end | ||
|
||
function combine(gd::GroupedDataFrame; f...) | ||
if length(f) == 0 | ||
throw(ArgumentError("combine(gd) is not allowed, use DataFrame(gd) " * | ||
"to combine a GroupedDataFrame into a DataFrame")) | ||
if length(f) > 0 | ||
Base.depwarn("`combine(gd; target_col = source_cols => fun, ...)` is deprecated" * | ||
", use `combine(gd, source_cols => fun => :target_col, ...)` instead", | ||
:combine) | ||
end | ||
Base.depwarn("`combine(gd; target_col = source_cols => fun, ...)` is deprecated" * | ||
", use `combine(gd, source_cols => fun => :target_col, ...)` instead", | ||
:combine) | ||
# in the future handle keepkeys and ungroup | ||
return combine(gd, [source_cols => fun => out_col for (out_col, (source_cols, fun)) in f]) | ||
end | ||
|
||
|
@@ -597,33 +603,59 @@ function combine_helper(f, gd::GroupedDataFrame, | |
else | ||
newparent = parent(gd)[idx, gd.cols] | ||
end | ||
hcat!(newparent, select(valscat, Not(intersect(keys, _names(valscat))), copycols=false), | ||
hcat!(newparent, | ||
select(valscat, Not(intersect(keys, _names(valscat))), copycols=false), | ||
copycols=false) | ||
ungroup && return newparent | ||
|
||
if length(idx) == 0 | ||
if length(idx) == 0 && !(keeprows && length(keys) > 0) | ||
@assert nrow(newparent) == 0 | ||
return GroupedDataFrame(newparent, collect(1:length(gd.cols)), Int[], | ||
Int[], Int[], Int[], 0, Dict{Any,Int}()) | ||
end | ||
if keeprows | ||
Int[], Int[], Int[], 0, Dict{Any,Int}(), | ||
Threads.ReentrantLock()) | ||
elseif keeprows | ||
@assert length(keys) > 0 || idx == gd.idx | ||
@assert names(newparent, 1:length(gd.cols)) == names(parent(gd), gd.cols) | ||
# in this case we are sure that the result GroupedDataFrame has the | ||
# same structure as the source | ||
# we do not copy data as it should be safe - we never mutate fields of gd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When do we mutate fields now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not mutate fields now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather avoid making copies preventively as long as we never mutate fields. Especially if we drop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK - changed. I have made an extensive implementation note, as |
||
return GroupedDataFrame(newparent, gd.cols, gd.groups, gd.idx, | ||
gd.starts, gd.ends, gd.ngroups, getfield(gd, :keymap)) | ||
# same structure as the source except that grouping columns are at the start | ||
Threads.lock(gd.lazy_lock) | ||
new_gd = GroupedDataFrame(newparent, collect(1:length(gd.cols)), gd.groups, | ||
getfield(gd, :idx), getfield(gd, :starts), | ||
getfield(gd, :ends), gd.ngroups, | ||
getfield(gd, :keymap), Threads.ReentrantLock()) | ||
Threads.lock(gd.lazy_lock) | ||
return new_gd | ||
else | ||
groups = gen_groups(idx) | ||
@assert groups[end] <= length(gd) | ||
@assert names(newparent, 1:length(gd.cols)) == names(parent(gd), gd.cols) | ||
return GroupedDataFrame(newparent, collect(1:length(gd.cols)), groups, | ||
nothing, nothing, nothing, groups[end], nothing) | ||
nothing, nothing, nothing, groups[end], nothing, | ||
Threads.ReentrantLock()) | ||
end | ||
else | ||
if ungroup | ||
return keepkeys ? parent(gd)[1:0, gd.cols] : DataFrame() | ||
if keeprows | ||
if nrow(parent(gd)) > 0 | ||
throw(ArgumentError("select and transform do not support " * | ||
"`GroupedDataFrame`s from which some groups have "* | ||
"been dropped (including skipmissing=true)")) | ||
end | ||
if ungroup | ||
return keepkeys ? select(parent(gd), gd.cols, copycols=copycols) : DataFrame() | ||
else | ||
return GroupedDataFrame(select(parent(gd), gd.cols, copycols=copycols), | ||
collect(1:length(gd.cols)), | ||
Int[], Int[], Int[], Int[], 0, Dict{Any,Int}(), | ||
Threads.ReentrantLock()) | ||
end | ||
else | ||
return GroupedDataFrame(parent(gd)[1:0, gd.cols], collect(1:length(gd.cols)), | ||
Int[], Int[], Int[], Int[], 0, Dict{Any,Int}()) | ||
if ungroup | ||
return keepkeys ? parent(gd)[1:0, gd.cols] : DataFrame() | ||
else | ||
return GroupedDataFrame(parent(gd)[1:0, gd.cols], collect(1:length(gd.cols)), | ||
Int[], Int[], Int[], Int[], 0, Dict{Any,Int}(), | ||
Threads.ReentrantLock()) | ||
end | ||
end | ||
end | ||
end | ||
|
@@ -1086,6 +1118,7 @@ function _combine(f::AbstractVector{<:Pair}, | |
@assert all(x -> first(x) isa Union{Int, AbstractVector{Int}, AsTable}, f) | ||
@assert all(x -> last(x) isa Base.Callable, f) | ||
|
||
isempty(f) && return Int[], DataFrame() | ||
if keeprows | ||
if minimum(gd.groups) == 0 | ||
throw(ArgumentError("select and transform do not support " * | ||
|
@@ -1482,11 +1515,16 @@ end | |
select(gd::GroupedDataFrame, args...; | ||
copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true) | ||
|
||
Apply `args` to `gd` following the rules described in [`combine`](@ref) and return the | ||
result as a `DataFrame` if `ungroup=true` or `GroupedDataFrame` if `ungroup=false`. | ||
Apply `args` to `gd` following the rules described in [`combine`](@ref). | ||
|
||
If `ungroup=true` the result is a `DataFrame`. | ||
If `ungroup=false` the result is a `GroupedDataFrame` | ||
(in this case the returned value retains the order of groups of `gd`). | ||
bkamins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The `parent` of the returned value has as many rows as `parent(gd)`. If an operation | ||
in `args` returns a single value it is always broadcasted to have this number of rows. | ||
The `parent` of the returned value has as many rows as `parent(gd)` and | ||
in the same order, except when the returned value has no columns | ||
(in which case it has zero rows). If an operation in `args` returns | ||
a single value it is always broadcasted to have this number of rows. | ||
|
||
If `copycols=false` then do not perform copying of columns that are not transformed. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this again, shouldn't this line be moved inside
if getfield(gd, f) === nothing
? Ideally threads wouldn't have to lock each other when accessing indices once they have been computed. To avoid threads from computing the fields multiple times (which would be wasteful, though probably not problematic), we could checkgetfield(gd, f) === nothing
again after callinglock
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - I was also thinking about it. The solution is exactly as you say:
I just wanted to have a simpler implementation as
lock
/unlock
is cheap:and with threads often things are more tricky than they seem.
I will open a PR with the change you propose and we can discuss there.