Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various bugs in split/apply/combine in 0.21 release #2280

Merged
merged 9 commits into from
Jun 23, 2020
Merged
6 changes: 4 additions & 2 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ SELECT_ARG_RULES =
select!(df::DataFrame, args...)

Mutate `df` in place to retain only columns specified by `args...` and return it.
The result is guaranteed to have the same number of rows as `df`.
The result is guaranteed to have the same number of rows as `df`, except when no
columns are selected (in which case the result has zero rows).

$SELECT_ARG_RULES

Expand Down Expand Up @@ -373,7 +374,8 @@ transform!(df::DataFrame, args...) = select!(df, :, args...)
select(df::AbstractDataFrame, args...; copycols::Bool=true)

Create a new data frame that contains columns from `df` specified by `args` and
return it. The result is guaranteed to have the same number of rows as `df`.
return it. The result is guaranteed to have the same number of rows as `df`,
except when no columns are selected (in which case the result has zero rows)..

If `df` is a `DataFrame` or `copycols=true` then column renaming and transformations
are supported.
Expand Down
41 changes: 34 additions & 7 deletions src/groupeddataframe/groupeddataframe.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Implementation note
# There are two important design features of GroupedDataFrame
# 1. idx, starts, ends and keymap are by default left uninitialized;
# they get populated only on demand; this means that every GroupedDataFrame
# has lazy_lock field which is used to make sure that two threads concurrently
# do not try to create them. The lock should be used in every function that
# does a direct access to these fields via getfield.
# 2. Except for point 1 above currently fields of GroupedDataFrame are never
# mutated after it is created. This means that internally when copying
# a GroupedDataFrame they are not copied for efficiency. If in the future
# operations that mutate GroupedDataFrame are introduced all non-copying
# passing of the internal fields to a new GroupedDataFrame should be
# updated. Currently this applies to `getindex` and `combine_helper` functions

"""
GroupedDataFrame

Expand All @@ -15,6 +29,8 @@ mutable struct GroupedDataFrame{T<:AbstractDataFrame}
ends::Union{Vector{Int},Nothing} # ends of groups after permutation by idx
ngroups::Int # number of groups
keymap::Union{Dict{Any,Int},Nothing} # mapping of key tuples to group indices
lazy_lock::Threads.ReentrantLock # lock is needed to make lazy operations
# thread safe
end

function genkeymap(gd, cols)
Expand All @@ -34,15 +50,19 @@ end
function Base.getproperty(gd::GroupedDataFrame, f::Symbol)
if f in (:idx, :starts, :ends)
# Group indices are computed lazily the first time they are accessed
Threads.lock(gd.lazy_lock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this again, shouldn't this line be moved inside if getfield(gd, f) === nothing? Ideally threads wouldn't have to lock each other when accessing indices once they have been computed. To avoid threads from computing the fields multiple times (which would be wasteful, though probably not problematic), we could check getfield(gd, f) === nothing again after calling lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I was also thinking about it. The solution is exactly as you say:

we could check getfield(gd, f) === nothing again after calling lock.

I just wanted to have a simpler implementation as lock/unlock is cheap:

julia> function f(l)
       lock(l)
       unlock(l)
       end
f (generic function with 1 method)

julia> l = Threads.ReentrantLock()
ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0)

julia> f(l)

julia> @benchmark f($l)
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     26.066 ns (0.00% GC)
  median time:      26.087 ns (0.00% GC)
  mean time:        27.893 ns (0.00% GC)
  maximum time:     77.285 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     993

and with threads often things are more tricky than they seem.

I will open a PR with the change you propose and we can discuss there.

if getfield(gd, f) === nothing
gd.idx, gd.starts, gd.ends = compute_indices(gd.groups, gd.ngroups)
end
Threads.unlock(gd.lazy_lock)
return getfield(gd, f)::Vector{Int}
elseif f === :keymap
# Keymap is computed lazily the first time it is accessed
Threads.lock(gd.lazy_lock)
if getfield(gd, f) === nothing
gd.keymap = genkeymap(gd, ntuple(i -> parent(gd)[!, gd.cols[i]], length(gd.cols)))
end
Threads.unlock(gd.lazy_lock)
return getfield(gd, f)::Dict{Any,Int}
else
return getfield(gd, f)
Expand Down Expand Up @@ -190,14 +210,19 @@ function Base.getindex(gd::GroupedDataFrame, idxs::AbstractVector{<:Integer})
end
end
GroupedDataFrame(gd.parent, gd.cols, new_groups, gd.idx,
new_starts, new_ends, length(new_starts), nothing)
new_starts, new_ends, length(new_starts), nothing,
Threads.ReentrantLock())
end

# Index with colon (creates copy)
Base.getindex(gd::GroupedDataFrame, idxs::Colon) =
GroupedDataFrame(gd.parent, gd.cols, gd.groups, getfield(gd, :idx),
getfield(gd, :starts), getfield(gd, :ends), gd.ngroups,
getfield(gd, :keymap))
function Base.getindex(gd::GroupedDataFrame, idxs::Colon)
Threads.lock(gd.lazy_lock)
new_gd = GroupedDataFrame(gd.parent, gd.cols, gd.groups, getfield(gd, :idx),
getfield(gd, :starts), getfield(gd, :ends), gd.ngroups,
getfield(gd, :keymap), Threads.ReentrantLock())
Threads.unlock(gd.lazy_lock)
return new_gd
end


#
Expand Down Expand Up @@ -299,8 +324,10 @@ end
# The full version (to_indices) is required rather than to_index even though
# GroupedDataFrame behaves as a 1D array due to the behavior of Colon and Not.
# Note that this behavior would be the default if it was <:AbstractArray
Base.getindex(gd::GroupedDataFrame, idx...) =
getindex(gd, Base.to_indices(gd, idx)...)
function Base.getindex(gd::GroupedDataFrame, idx...)
length(idx) == 1 || throw(ArgumentError("GroupedDataFrame requires a single index"))
return getindex(gd, Base.to_indices(gd, idx)...)
end

# The allowed key types for dictionary-like indexing
const GroupKeyTypes = Union{GroupKey, Tuple, NamedTuple}
Expand Down
92 changes: 65 additions & 27 deletions src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,17 @@ function groupby(df::AbstractDataFrame, cols;
intcols = idxcols isa Int ? [idxcols] : convert(Vector{Int}, idxcols)
if isempty(intcols)
return GroupedDataFrame(df, intcols, ones(Int, nrow(df)),
collect(axes(df, 1)), [1], [nrow(df)], 1, nothing)
collect(axes(df, 1)), [1], [nrow(df)], 1, nothing,
Threads.ReentrantLock())
end
sdf = df[!, intcols]

groups = Vector{Int}(undef, nrow(df))
ngroups, rhashes, gslots, sorted =
row_group_slots(ntuple(i -> sdf[!, i], ncol(sdf)), Val(false), groups, skipmissing)

gd = GroupedDataFrame(df, intcols, groups, nothing, nothing, nothing, ngroups, nothing)
gd = GroupedDataFrame(df, intcols, groups, nothing, nothing, nothing, ngroups, nothing,
Threads.ReentrantLock())

# sort groups if row_group_slots hasn't already done that
if sort && !sorted
Expand Down Expand Up @@ -237,6 +239,8 @@ const F_ARGUMENT_RULES =
If the first or last argument is a function `fun`, it is passed a [`SubDataFrame`](@ref)
view for each group and can return any return value defined below.
Note that this form is slower than `pair` or `args` due to type instability.

If `gd` has zero groups then no transformations are applied.
"""

const KWARG_PROCESSING_RULES =
Expand All @@ -245,9 +249,13 @@ const KWARG_PROCESSING_RULES =
in addition to those generated. In this case if the returned
value contains columns with the same names as the grouping columns, they are
required to be equal.
If `keepkeys=false` and some generated columns have the same name as grouping columns,
they are kept and are not required to be equal to grouping columns.

If `ungroup=true` (the default) a `DataFrame` is returned.
If `ungroup=false` a `GroupedDataFrame` grouped using `keycols(gdf)` is returned.

If `gd` has zero groups then no transformations are applied.
"""

"""
Expand Down Expand Up @@ -471,7 +479,6 @@ function _combine_prepare(gd::GroupedDataFrame,
@nospecialize(cs::Union{Pair, typeof(nrow),
ColumnIndex, MultiColumnIndex}...);
keepkeys::Bool, ungroup::Bool, copycols::Bool, keeprows::Bool)
@assert !isempty(cs)
cs_vec = []
for p in cs
if p === nrow
Expand Down Expand Up @@ -548,13 +555,12 @@ function _combine_prepare(gd::GroupedDataFrame,
end

function combine(gd::GroupedDataFrame; f...)
if length(f) == 0
throw(ArgumentError("combine(gd) is not allowed, use DataFrame(gd) " *
"to combine a GroupedDataFrame into a DataFrame"))
if length(f) > 0
Base.depwarn("`combine(gd; target_col = source_cols => fun, ...)` is deprecated" *
", use `combine(gd, source_cols => fun => :target_col, ...)` instead",
:combine)
end
Base.depwarn("`combine(gd; target_col = source_cols => fun, ...)` is deprecated" *
", use `combine(gd, source_cols => fun => :target_col, ...)` instead",
:combine)
# in the future handle keepkeys and ungroup
return combine(gd, [source_cols => fun => out_col for (out_col, (source_cols, fun)) in f])
end

Expand Down Expand Up @@ -597,33 +603,59 @@ function combine_helper(f, gd::GroupedDataFrame,
else
newparent = parent(gd)[idx, gd.cols]
end
hcat!(newparent, select(valscat, Not(intersect(keys, _names(valscat))), copycols=false),
hcat!(newparent,
select(valscat, Not(intersect(keys, _names(valscat))), copycols=false),
copycols=false)
ungroup && return newparent

if length(idx) == 0
if length(idx) == 0 && !(keeprows && length(keys) > 0)
@assert nrow(newparent) == 0
return GroupedDataFrame(newparent, collect(1:length(gd.cols)), Int[],
Int[], Int[], Int[], 0, Dict{Any,Int}())
end
if keeprows
Int[], Int[], Int[], 0, Dict{Any,Int}(),
Threads.ReentrantLock())
elseif keeprows
@assert length(keys) > 0 || idx == gd.idx
@assert names(newparent, 1:length(gd.cols)) == names(parent(gd), gd.cols)
# in this case we are sure that the result GroupedDataFrame has the
# same structure as the source
# we do not copy data as it should be safe - we never mutate fields of gd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we mutate fields now?

Copy link
Member Author

@bkamins bkamins Jun 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not mutate fields now.
It is a preparation for filter!. But I prefer to make a copy here as it is not very expensive, and be safe that in the future changing if someone introduces mutation to GroupedDataFrame (even if we skip filter! for now as I assume we will do) it does not get forgotten (as it is easy to forget that this line in such a large codebase assumes that it is not mutated).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather avoid making copies preventively as long as we never mutate fields. Especially if we drop filter! for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - changed. I have made an extensive implementation note, as GroupedDataFrame is currently quite tricky to implement new functionality for.

return GroupedDataFrame(newparent, gd.cols, gd.groups, gd.idx,
gd.starts, gd.ends, gd.ngroups, getfield(gd, :keymap))
# same structure as the source except that grouping columns are at the start
Threads.lock(gd.lazy_lock)
new_gd = GroupedDataFrame(newparent, collect(1:length(gd.cols)), gd.groups,
getfield(gd, :idx), getfield(gd, :starts),
getfield(gd, :ends), gd.ngroups,
getfield(gd, :keymap), Threads.ReentrantLock())
Threads.lock(gd.lazy_lock)
return new_gd
else
groups = gen_groups(idx)
@assert groups[end] <= length(gd)
@assert names(newparent, 1:length(gd.cols)) == names(parent(gd), gd.cols)
return GroupedDataFrame(newparent, collect(1:length(gd.cols)), groups,
nothing, nothing, nothing, groups[end], nothing)
nothing, nothing, nothing, groups[end], nothing,
Threads.ReentrantLock())
end
else
if ungroup
return keepkeys ? parent(gd)[1:0, gd.cols] : DataFrame()
if keeprows
if nrow(parent(gd)) > 0
throw(ArgumentError("select and transform do not support " *
"`GroupedDataFrame`s from which some groups have "*
"been dropped (including skipmissing=true)"))
end
if ungroup
return keepkeys ? select(parent(gd), gd.cols, copycols=copycols) : DataFrame()
else
return GroupedDataFrame(select(parent(gd), gd.cols, copycols=copycols),
collect(1:length(gd.cols)),
Int[], Int[], Int[], Int[], 0, Dict{Any,Int}(),
Threads.ReentrantLock())
end
else
return GroupedDataFrame(parent(gd)[1:0, gd.cols], collect(1:length(gd.cols)),
Int[], Int[], Int[], Int[], 0, Dict{Any,Int}())
if ungroup
return keepkeys ? parent(gd)[1:0, gd.cols] : DataFrame()
else
return GroupedDataFrame(parent(gd)[1:0, gd.cols], collect(1:length(gd.cols)),
Int[], Int[], Int[], Int[], 0, Dict{Any,Int}(),
Threads.ReentrantLock())
end
end
end
end
Expand Down Expand Up @@ -1086,6 +1118,7 @@ function _combine(f::AbstractVector{<:Pair},
@assert all(x -> first(x) isa Union{Int, AbstractVector{Int}, AsTable}, f)
@assert all(x -> last(x) isa Base.Callable, f)

isempty(f) && return Int[], DataFrame()
if keeprows
if minimum(gd.groups) == 0
throw(ArgumentError("select and transform do not support " *
Expand Down Expand Up @@ -1482,11 +1515,16 @@ end
select(gd::GroupedDataFrame, args...;
copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true)

Apply `args` to `gd` following the rules described in [`combine`](@ref) and return the
result as a `DataFrame` if `ungroup=true` or `GroupedDataFrame` if `ungroup=false`.
Apply `args` to `gd` following the rules described in [`combine`](@ref).

If `ungroup=true` the result is a `DataFrame`.
If `ungroup=false` the result is a `GroupedDataFrame`
(in this case the returned value retains the order of groups of `gd`).

The `parent` of the returned value has as many rows as `parent(gd)`. If an operation
in `args` returns a single value it is always broadcasted to have this number of rows.
The `parent` of the returned value has as many rows as `parent(gd)` and
in the same order, except when the returned value has no columns
(in which case it has zero rows). If an operation in `args` returns
a single value it is always broadcasted to have this number of rows.

If `copycols=false` then do not perform copying of columns that are not transformed.

Expand Down
Loading