From 6594e787deaf3d45901c6cf5e4ca061a2f26b272 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 2 Dec 2020 09:17:01 +0100 Subject: [PATCH 01/22] Spawn one task per operation --- src/DataFrames.jl | 2 +- src/groupeddataframe/splitapplycombine.jl | 180 ++++++++++++---------- 2 files changed, 97 insertions(+), 85 deletions(-) diff --git a/src/DataFrames.jl b/src/DataFrames.jl index 0715c40a3e..beea5e3979 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -3,7 +3,7 @@ module DataFrames using Statistics, Printf, REPL using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays @reexport using Missings, InvertedIndices -using Base.Sort, Base.Order, Base.Iterators +using Base.Sort, Base.Order, Base.Iterators, Base.Threads using TableTraits, IteratorInterfaceExtensions import LinearAlgebra: norm using Markdown diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 5bbe77496a..fcaaf15bfe 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -202,16 +202,18 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S agg = check_aggregate(first(last(cs_i)), incol) outcol = agg(incol, gd) - if haskey(seen_cols, out_col_name) - optional, loc = seen_cols[out_col_name] - # we have seen this col but it is not allowed to replace it - optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) - @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx_agg, outcol, out_col_name, optional_i) - seen_cols[out_col_name] = (optional_i, loc) - else - push!(trans_res, TransformationResult(idx_agg, outcol, out_col_name, optional_i)) - seen_cols[out_col_name] = (optional_i, length(trans_res)) + Threads.lock(gd.lazy_lock) do + if haskey(seen_cols, out_col_name) + optional, loc = seen_cols[out_col_name] + # we have seen this col but it is not allowed to replace it + optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) + @assert trans_res[loc].optional && trans_res[loc].name == out_col_name + trans_res[loc] = TransformationResult(idx_agg, outcol, out_col_name, optional_i) + seen_cols[out_col_name] = (optional_i, loc) + else + push!(trans_res, TransformationResult(idx_agg, outcol, out_col_name, optional_i)) + seen_cols[out_col_name] = (optional_i, length(trans_res)) + end end end @@ -231,24 +233,26 @@ function _combine_process_noop(cs_i::Pair{<:Union{Int, AbstractVector{Int}}, Pai end outcol = parentdf[!, first(source_cols)] - if haskey(seen_cols, out_col_name) - optional, loc = seen_cols[out_col_name] - @assert trans_res[loc].name == out_col_name - if optional - if !optional_i - @assert trans_res[loc].optional - trans_res[loc] = TransformationResult(idx_keeprows, copycols ? copy(outcol) : outcol, - out_col_name, optional_i) - seen_cols[out_col_name] = (optional_i, loc) + Threads.lock(gd.lazy_lock) do + if haskey(seen_cols, out_col_name) + optional, loc = seen_cols[out_col_name] + @assert trans_res[loc].name == out_col_name + if optional + if !optional_i + @assert trans_res[loc].optional + trans_res[loc] = TransformationResult(idx_keeprows, copycols ? copy(outcol) : outcol, + out_col_name, optional_i) + seen_cols[out_col_name] = (optional_i, loc) + end + else + # if optional_i is true, then we ignore processing this column + optional_i || throw(ArgumentError("duplicate output column name: :$out_col_name")) end else - # if optional_i is true, then we ignore processing this column - optional_i || throw(ArgumentError("duplicate output column name: :$out_col_name")) + push!(trans_res, TransformationResult(idx_keeprows, copycols ? copy(outcol) : outcol, + out_col_name, optional_i)) + seen_cols[out_col_name] = (optional_i, length(trans_res)) end - else - push!(trans_res, TransformationResult(idx_keeprows, copycols ? copy(outcol) : outcol, - out_col_name, optional_i)) - seen_cols[out_col_name] = (optional_i, length(trans_res)) end end @@ -275,23 +279,25 @@ function _combine_process_callable(@nospecialize(cs_i::Base.Callable), idx = idx_agg end @assert length(outcols) == length(nms) - for j in eachindex(outcols) - outcol = outcols[j] - out_col_name = nms[j] - if haskey(seen_cols, out_col_name) - optional, loc = seen_cols[out_col_name] - # if column was seen and it is optional now ignore it - if !optional_i + Threads.lock(gd.lazy_lock) do + for j in eachindex(outcols) + outcol = outcols[j] + out_col_name = nms[j] + if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] - # we have seen this col but it is not allowed to replace it - optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) - @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i) - seen_cols[out_col_name] = (optional_i, loc) + # if column was seen and it is optional now ignore it + if !optional_i + optional, loc = seen_cols[out_col_name] + # we have seen this col but it is not allowed to replace it + optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) + @assert trans_res[loc].optional && trans_res[loc].name == out_col_name + trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i) + seen_cols[out_col_name] = (optional_i, loc) + end + else + push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) + seen_cols[out_col_name] = (optional_i, length(trans_res)) end - else - push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) - seen_cols[out_col_name] = (optional_i, length(trans_res)) end end return idx_agg @@ -330,19 +336,21 @@ function _combine_process_pair_symbol(optional_i::Bool, @assert length(outcols) == 1 outcol = outcols[1] - if haskey(seen_cols, out_col_name) - # if column was seen and it is optional now ignore it - if !optional_i - optional, loc = seen_cols[out_col_name] - # we have seen this col but it is not allowed to replace it - optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) - @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i) - seen_cols[out_col_name] = (optional_i, loc) + Threads.lock(gd.lazy_lock) do + if haskey(seen_cols, out_col_name) + # if column was seen and it is optional now ignore it + if !optional_i + optional, loc = seen_cols[out_col_name] + # we have seen this col but it is not allowed to replace it + optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) + @assert trans_res[loc].optional && trans_res[loc].name == out_col_name + trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i) + seen_cols[out_col_name] = (optional_i, loc) + end + else + push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) + seen_cols[out_col_name] = (optional_i, length(trans_res)) end - else - push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) - seen_cols[out_col_name] = (optional_i, length(trans_res)) end return idx_agg end @@ -405,23 +413,25 @@ function _combine_process_pair_astable(optional_i::Bool, nms = out_col_name end end - for j in eachindex(outcols) - outcol = outcols[j] - out_col_name = nms[j] - if haskey(seen_cols, out_col_name) - optional, loc = seen_cols[out_col_name] - # if column was seen and it is optional now ignore it - if !optional_i + Threads.lock(gd.lazy_lock) do + for j in eachindex(outcols) + outcol = outcols[j] + out_col_name = nms[j] + if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] - # we have seen this col but it is not allowed to replace it - optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) - @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i) - seen_cols[out_col_name] = (optional_i, loc) + # if column was seen and it is optional now ignore it + if !optional_i + optional, loc = seen_cols[out_col_name] + # we have seen this col but it is not allowed to replace it + optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) + @assert trans_res[loc].optional && trans_res[loc].name == out_col_name + trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i) + seen_cols[out_col_name] = (optional_i, loc) + end + else + push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) + seen_cols[out_col_name] = (optional_i, length(trans_res)) end - else - push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) - seen_cols[out_col_name] = (optional_i, length(trans_res)) end end return idx_agg @@ -523,37 +533,39 @@ function _combine(gd::GroupedDataFrame, # and if a given column can be replaced in the future seen_cols = Dict{Symbol, Tuple{Bool, Int}}() + tasks = similar(cs_norm, Task) + parentdf = parent(gd) - for i in eachindex(cs_norm, optional_transform) + for i in eachindex(cs_norm, optional_transform, tasks) cs_i = cs_norm[i] optional_i = optional_transform[i] - if length(gd) > 0 && isagg(cs_i, gd) + tasks[i] = Threads.@spawn if length(gd) > 0 && isagg(cs_i, gd) _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && !(first(cs_i) isa AsTable) && (last(last(cs_i)) isa Symbol) # this is a fast path used when we pass a column or rename a column in select or transform _combine_process_noop(cs_i, optional_i, parentdf, seen_cols, trans_res, idx_keeprows, copycols) elseif cs_i isa Base.Callable - idx_callable = _combine_process_callable(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) - if idx_callable !== nothing - if idx_agg === nothing - idx_agg = idx_callable - else - @assert idx_agg === idx_callable - end - end + _combine_process_callable(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) else @assert cs_i isa Pair - idx_pair = _combine_process_pair(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) - if idx_pair !== nothing - if idx_agg === nothing - idx_agg = idx_pair - else - @assert idx_agg === idx_pair - end + _combine_process_pair(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) + end + end + for t in tasks + try + idx = fetch(t) + catch e + if e isa TaskFailedException + rethrow(t.exception) + else + rethrow(e) end end + if idx_agg === nothing && idx !== nothing + idx_agg = idx + end end isempty(trans_res) && return Int[], DataFrame() From 8e2904806ec30e96fdfd047bb13daacc7710d9bf Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 5 Dec 2020 13:11:03 +0100 Subject: [PATCH 02/22] Fix handling of idx_agg and order of columns --- src/groupeddataframe/splitapplycombine.jl | 106 ++++++++++++---------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index fcaaf15bfe..ad469aed2d 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -194,7 +194,7 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}) + idx_agg::Ref{Union{Nothing, Vector{Int}}}) @assert isagg(cs_i, gd) @assert !optional_i out_col_name = last(last(cs_i)) @@ -202,18 +202,19 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S agg = check_aggregate(first(last(cs_i)), incol) outcol = agg(incol, gd) - Threads.lock(gd.lazy_lock) do + return function() if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] # we have seen this col but it is not allowed to replace it optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx_agg, outcol, out_col_name, optional_i) + trans_res[loc] = TransformationResult(idx_agg[], outcol, out_col_name, optional_i) seen_cols[out_col_name] = (optional_i, loc) else - push!(trans_res, TransformationResult(idx_agg, outcol, out_col_name, optional_i)) + push!(trans_res, TransformationResult(idx_agg[], outcol, out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end + return nothing end end @@ -233,7 +234,7 @@ function _combine_process_noop(cs_i::Pair{<:Union{Int, AbstractVector{Int}}, Pai end outcol = parentdf[!, first(source_cols)] - Threads.lock(gd.lazy_lock) do + return function() if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] @assert trans_res[loc].name == out_col_name @@ -253,6 +254,7 @@ function _combine_process_noop(cs_i::Pair{<:Union{Int, AbstractVector{Int}}, Pai out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end + return nothing end end @@ -263,23 +265,25 @@ function _combine_process_callable(@nospecialize(cs_i::Base.Callable), gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}) + idx_agg::Ref{Union{Nothing, Vector{Int}}}) firstres = length(gd) > 0 ? cs_i(gd[1]) : cs_i(similar(parentdf, 0)) idx, outcols, nms = _combine_multicol(firstres, cs_i, gd, nothing) if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}) - # if idx_agg was not computed yet it is nothing - # in this case if we are not passed a vector compute it. - if isnothing(idx_agg) - idx_agg = Vector{Int}(undef, length(gd)) - fillfirst!(nothing, idx_agg, 1:length(gd.groups), gd) + lock(gd.lazy_lock) do + # if idx_agg was not computed yet it is nothing + # in this case if we are not passed a vector compute it. + if isnothing(idx_agg[]) + idx_agg[] = Vector{Int}(undef, length(gd)) + fillfirst!(nothing, idx_agg[], 1:length(gd.groups), gd) + end + @assert idx == idx_agg[] + idx = idx_agg[] end - @assert idx == idx_agg - idx = idx_agg end @assert length(outcols) == length(nms) - Threads.lock(gd.lazy_lock) do + return function() for j in eachindex(outcols) outcol = outcols[j] out_col_name = nms[j] @@ -299,8 +303,8 @@ function _combine_process_callable(@nospecialize(cs_i::Base.Callable), seen_cols[out_col_name] = (optional_i, length(trans_res)) end end + return idx_agg[] end - return idx_agg end # perform a transformation specified using the Pair notation with a single output column @@ -308,7 +312,7 @@ function _combine_process_pair_symbol(optional_i::Bool, gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}, + idx_agg::Ref{Union{Nothing, Vector{Int}}}, out_col_name::Symbol, firstmulticol::Bool, firstres::Any, @@ -319,9 +323,11 @@ function _combine_process_pair_symbol(optional_i::Bool, end # if idx_agg was not computed yet it is nothing # in this case if we are not passed a vector compute it. - if !(firstres isa AbstractVector) && isnothing(idx_agg) - idx_agg = Vector{Int}(undef, length(gd)) - fillfirst!(nothing, idx_agg, 1:length(gd.groups), gd) + lock(gd.lazy_lock) do + if !(firstres isa AbstractVector) && isnothing(idx_agg[]) + idx_agg[] = Vector{Int}(undef, length(gd)) + fillfirst!(nothing, idx_agg[], 1:length(gd.groups), gd) + end end # TODO: if firstres is a vector we recompute idx for every function # this could be avoided - it could be computed only the first time @@ -332,11 +338,11 @@ function _combine_process_pair_symbol(optional_i::Bool, # nothing to signal that idx has to be computed in _combine_with_first idx, outcols, _ = _combine_with_first(wrap(firstres), fun, gd, incols, Val(firstmulticol), - firstres isa AbstractVector ? nothing : idx_agg) + firstres isa AbstractVector ? nothing : idx_agg[]) @assert length(outcols) == 1 outcol = outcols[1] - Threads.lock(gd.lazy_lock) do + return function() if haskey(seen_cols, out_col_name) # if column was seen and it is optional now ignore it if !optional_i @@ -351,8 +357,8 @@ function _combine_process_pair_symbol(optional_i::Bool, push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end + return idx_agg[] end - return idx_agg end # perform a transformation specified using the Pair notation with multiple output columns @@ -360,7 +366,7 @@ function _combine_process_pair_astable(optional_i::Bool, gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}, + idx_agg::Ref{Union{Nothing, Vector{Int}}}, out_col_name::Union{Type{AsTable}, AbstractVector{Symbol}}, firstmulticol::Bool, firstres::Any, @@ -394,14 +400,16 @@ function _combine_process_pair_astable(optional_i::Bool, if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}) - # if idx_agg was not computed yet it is nothing - # in this case if we are not passed a vector compute it. - if isnothing(idx_agg) - idx_agg = Vector{Int}(undef, length(gd)) - fillfirst!(nothing, idx_agg, 1:length(gd.groups), gd) + lock(gd.lazy_lock) do + # if idx_agg was not computed yet it is nothing + # in this case if we are not passed a vector compute it. + if isnothing(idx_agg[]) + idx_agg[] = Vector{Int}(undef, length(gd)) + fillfirst!(nothing, idx_agg[], 1:length(gd.groups), gd) + end + @assert idx == idx_agg[] + idx = idx_agg[] end - @assert idx == idx_agg - idx = idx_agg end @assert length(outcols) == length(nms) end @@ -413,7 +421,7 @@ function _combine_process_pair_astable(optional_i::Bool, nms = out_col_name end end - Threads.lock(gd.lazy_lock) do + return function() for j in eachindex(outcols) outcol = outcols[j] out_col_name = nms[j] @@ -433,8 +441,8 @@ function _combine_process_pair_astable(optional_i::Bool, seen_cols[out_col_name] = (optional_i, length(trans_res)) end end + return idx_agg[] end - return idx_agg end # perform a transformation specified using the Pair notation @@ -446,7 +454,7 @@ function _combine_process_pair(@nospecialize(cs_i::Pair), gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}) + idx_agg::Ref{Union{Nothing, Vector{Int}}}) source_cols, (fun, out_col_name) = cs_i if source_cols isa Int @@ -516,11 +524,11 @@ function _combine(gd::GroupedDataFrame, idx_keeprows = nothing end - idx_agg = nothing + idx_agg = Ref{Union{Nothing, Vector{Int}}}(nothing) if length(gd) > 0 && any(x -> isagg(x, gd), cs_norm) # Compute indices of representative rows only once for all AbstractAggregates - idx_agg = Vector{Int}(undef, length(gd)) - fillfirst!(nothing, idx_agg, 1:length(gd.groups), gd) + idx_agg[] = Vector{Int}(undef, length(gd)) + fillfirst!(nothing, idx_agg[], 1:length(gd.groups), gd) elseif length(gd) == 0 || !all(x -> isagg(x, gd), cs_norm) # Trigger computation of indices # This can speed up some aggregates that would not trigger this on their own @@ -554,39 +562,45 @@ function _combine(gd::GroupedDataFrame, end end for t in tasks + local postprocessf try - idx = fetch(t) + postprocessf = fetch(t) catch e if e isa TaskFailedException rethrow(t.exception) else - rethrow(e) + rethrow() end end - if idx_agg === nothing && idx !== nothing - idx_agg = idx + idx = postprocessf() + if idx !== nothing + if idx_agg[] === nothing + idx_agg[] = idx + else + @assert idx_agg[] === idx + end end end isempty(trans_res) && return Int[], DataFrame() # idx_agg === nothing then we have only functions that # returned multiple rows and idx_loc = 1 - idx_loc = findfirst(x -> x.col_idx !== idx_agg, trans_res) + idx_loc = findfirst(x -> x.col_idx !== idx_agg[], trans_res) if !keeprows && isnothing(idx_loc) - @assert !isnothing(idx_agg) - idx = idx_agg + @assert !isnothing(idx_agg[]) + idx = idx_agg[] else idx = keeprows ? idx_keeprows : trans_res[idx_loc].col_idx agg2idx_map = nothing for i in 1:length(trans_res) if trans_res[i].col_idx !== idx - if trans_res[i].col_idx === idx_agg + if trans_res[i].col_idx === idx_agg[] # we perform pseudo broadcasting here # keep -1 as a sentinel for errors if isnothing(agg2idx_map) - agg2idx_map = _agg2idx_map_helper(idx, idx_agg) + agg2idx_map = _agg2idx_map_helper(idx, idx_agg[]) end - trans_res[i] = TransformationResult(idx_agg, trans_res[i].col[agg2idx_map], + trans_res[i] = TransformationResult(idx_agg[], trans_res[i].col[agg2idx_map], trans_res[i].name, trans_res[i].optional) elseif idx != trans_res[i].col_idx if keeprows From 2ca83f96c10aea5d0d996f4f2f825f05d918e7b5 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 6 Dec 2020 12:21:15 +0100 Subject: [PATCH 03/22] Use threading in another loop --- src/groupeddataframe/splitapplycombine.jl | 32 ++++++++++++----------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index ad469aed2d..8fbc57d784 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -617,23 +617,25 @@ function _combine(gd::GroupedDataFrame, # here first field in trans_res[i] is used to keep track how the column was generated # a correct index is stored in idx variable - for i in eachindex(trans_res) - col_idx = trans_res[i].col_idx - col = trans_res[i].col - if keeprows && col_idx !== idx_keeprows # we need to reorder the column - newcol = similar(col) - # we can probably make it more efficient, but I leave it as an optimization for the future - gd_idx = gd.idx - k = 0 - # consider adding @inbounds later - for (s, e) in zip(gd.starts, gd.ends) - for j in s:e - k += 1 - newcol[gd_idx[j]] = col[k] + @sync for i in eachindex(trans_res) + Threads.@spawn begin + col_idx = trans_res[i].col_idx + col = trans_res[i].col + if keeprows && col_idx !== idx_keeprows # we need to reorder the column + newcol = similar(col) + # we can probably make it more efficient, but I leave it as an optimization for the future + gd_idx = gd.idx + k = 0 + # consider adding @inbounds later + for (s, e) in zip(gd.starts, gd.ends) + for j in s:e + k += 1 + newcol[gd_idx[j]] = col[k] + end end + @assert k == length(gd_idx) + trans_res[i] = TransformationResult(col_idx, newcol, trans_res[i].name, trans_res[i].optional) end - @assert k == length(gd_idx) - trans_res[i] = TransformationResult(col_idx, newcol, trans_res[i].name, trans_res[i].optional) end end From b9eb025c788efdf8788ab1cc6eb5e9312838e1f3 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 12 Dec 2020 17:52:55 +0100 Subject: [PATCH 04/22] Remove redundant idx handling --- src/groupeddataframe/splitapplycombine.jl | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 8fbc57d784..8a44568221 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -214,7 +214,6 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S push!(trans_res, TransformationResult(idx_agg[], outcol, out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end - return nothing end end @@ -254,7 +253,6 @@ function _combine_process_noop(cs_i::Pair{<:Union{Int, AbstractVector{Int}}, Pai out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end - return nothing end end @@ -303,7 +301,6 @@ function _combine_process_callable(@nospecialize(cs_i::Base.Callable), seen_cols[out_col_name] = (optional_i, length(trans_res)) end end - return idx_agg[] end end @@ -357,7 +354,6 @@ function _combine_process_pair_symbol(optional_i::Bool, push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end - return idx_agg[] end end @@ -441,7 +437,6 @@ function _combine_process_pair_astable(optional_i::Bool, seen_cols[out_col_name] = (optional_i, length(trans_res)) end end - return idx_agg[] end end @@ -475,7 +470,7 @@ function _combine_process_pair(@nospecialize(cs_i::Pair), if out_col_name isa Symbol return _combine_process_pair_symbol(optional_i, gd, seen_cols, trans_res, idx_agg, - out_col_name, firstmulticol, firstres, fun, incols) + out_col_name, firstmulticol, firstres, fun, incols) end if out_col_name == AsTable || out_col_name isa AbstractVector{Symbol} return _combine_process_pair_astable(optional_i, gd, seen_cols, trans_res, idx_agg, @@ -572,14 +567,7 @@ function _combine(gd::GroupedDataFrame, rethrow() end end - idx = postprocessf() - if idx !== nothing - if idx_agg[] === nothing - idx_agg[] = idx - else - @assert idx_agg[] === idx - end - end + postprocessf() end isempty(trans_res) && return Int[], DataFrame() From 487fce73c015f96b3ed3dd4ab18521ba4bd8e6be Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 12 Dec 2020 18:44:20 +0100 Subject: [PATCH 05/22] Support Julia 1.0 --- src/DataFrames.jl | 8 ++++++++ src/groupeddataframe/splitapplycombine.jl | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/DataFrames.jl b/src/DataFrames.jl index beea5e3979..7e5c76e5bb 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -91,6 +91,14 @@ else export only end +if VERSION >= v"1.3" + using Base.Threads: @spawn +else + macro spawn(expr) + :(@task $expr) + end +end + include("other/utils.jl") include("other/index.jl") diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 8a44568221..c08cf2c4c6 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -543,7 +543,7 @@ function _combine(gd::GroupedDataFrame, cs_i = cs_norm[i] optional_i = optional_transform[i] - tasks[i] = Threads.@spawn if length(gd) > 0 && isagg(cs_i, gd) + tasks[i] = @spawn if length(gd) > 0 && isagg(cs_i, gd) _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && !(first(cs_i) isa AsTable) && (last(last(cs_i)) isa Symbol) From 78106eacc85330d0062eee1cfbc42f3212e837a1 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 12 Dec 2020 20:10:54 +0100 Subject: [PATCH 06/22] tmp --- src/DataFrames.jl | 11 ++++++++++- src/groupeddataframe/splitapplycombine.jl | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/DataFrames.jl b/src/DataFrames.jl index 7e5c76e5bb..391b55ed77 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -94,8 +94,17 @@ end if VERSION >= v"1.3" using Base.Threads: @spawn else + # This is the definition of @async in Base macro spawn(expr) - :(@task $expr) + thunk = esc(:(()->($expr))) + var = esc(Base.sync_varname) + quote + local task = Task($thunk) + if $(Expr(:isdefined, var)) + push!($var, task) + end + schedule(task) + end end end diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index c08cf2c4c6..dfd838a657 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -561,7 +561,7 @@ function _combine(gd::GroupedDataFrame, try postprocessf = fetch(t) catch e - if e isa TaskFailedException + if VERSION >= v"1.3" && e isa TaskFailedException rethrow(t.exception) else rethrow() @@ -606,7 +606,7 @@ function _combine(gd::GroupedDataFrame, # a correct index is stored in idx variable @sync for i in eachindex(trans_res) - Threads.@spawn begin + @spawn begin col_idx = trans_res[i].col_idx col = trans_res[i].col if keeprows && col_idx !== idx_keeprows # we need to reorder the column From 97090f49bd50eb699d935fcc7108522cac76205e Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 12 Dec 2020 18:57:53 +0100 Subject: [PATCH 07/22] idx_agg can't be nothing --- src/groupeddataframe/splitapplycombine.jl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index dfd838a657..5ea03d5ae2 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -194,7 +194,7 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Ref{Union{Nothing, Vector{Int}}}) + idx_agg::AbstractVector{Int}) @assert isagg(cs_i, gd) @assert !optional_i out_col_name = last(last(cs_i)) @@ -208,10 +208,10 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S # we have seen this col but it is not allowed to replace it optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx_agg[], outcol, out_col_name, optional_i) + trans_res[loc] = TransformationResult(idx_agg, outcol, out_col_name, optional_i) seen_cols[out_col_name] = (optional_i, loc) else - push!(trans_res, TransformationResult(idx_agg[], outcol, out_col_name, optional_i)) + push!(trans_res, TransformationResult(idx_agg, outcol, out_col_name, optional_i)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end end @@ -544,7 +544,7 @@ function _combine(gd::GroupedDataFrame, optional_i = optional_transform[i] tasks[i] = @spawn if length(gd) > 0 && isagg(cs_i, gd) - _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) + _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg[]) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && !(first(cs_i) isa AsTable) && (last(last(cs_i)) isa Symbol) # this is a fast path used when we pass a column or rename a column in select or transform From 801e54ec6a46aa44801aa38bf8378e499ae551b0 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 12 Dec 2020 21:38:52 +0100 Subject: [PATCH 08/22] Use default exception handling --- src/groupeddataframe/splitapplycombine.jl | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 5ea03d5ae2..202d9328e2 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -556,17 +556,10 @@ function _combine(gd::GroupedDataFrame, _combine_process_pair(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) end end + # Post-processing has to be run sequentially + # since the order of operations determines that of columns for t in tasks - local postprocessf - try - postprocessf = fetch(t) - catch e - if VERSION >= v"1.3" && e isa TaskFailedException - rethrow(t.exception) - else - rethrow() - end - end + postprocessf = fetch(t) postprocessf() end From 688e7c3d852b4799fe2eca67f623bbe3516961e5 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 13 Dec 2020 14:43:00 +0100 Subject: [PATCH 09/22] Add test --- Project.toml | 3 ++- test/grouping.jl | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/Project.toml b/Project.toml index 6d75a2e0e7..a8d39acb10 100644 --- a/Project.toml +++ b/Project.toml @@ -39,6 +39,7 @@ TableTraits = "0.4, 1" Tables = "1.2" [extras] +Combinatorics = "861a8166-3701-5b0c-9a16-15d98fcdc6aa" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" DataValues = "e7dc6d0d-1eca-5fa6-8ad6-5aecde8b7ea5" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" @@ -47,4 +48,4 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["DataStructures", "DataValues", "Dates", "Logging", "Random", "Test"] +test = ["Combinatorics", "DataStructures", "DataValues", "Dates", "Logging", "Random", "Test"] diff --git a/test/grouping.jl b/test/grouping.jl index a2df4868ae..169e6a67bd 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -1,6 +1,7 @@ module TestGrouping -using Test, DataFrames, Random, Statistics, PooledArrays, CategoricalArrays, DataAPI +using Test, DataFrames, Random, Statistics, PooledArrays, CategoricalArrays, DataAPI, + Combinatorics const ≅ = isequal """Check if passed data frames are `isequal` and have the same element types of columns""" @@ -3272,4 +3273,24 @@ end @test df == df2 end +@testset "permutations of operations with combine" begin + df = DataFrame(id=rand(1:10, 20)) + gd = groupby(df, :id) + trans = [:id => (y -> sum(y)) => :v1, + :id => (y -> 10maximum(y)) => :v2, + y -> (v3=100y.id[1],), + y -> (v4=fill(1000y.id[1],y.id[1]+1),)] + + for p in permutations(1:4), i in 1:4 + res = combine(gd, trans[p[1:i]]...) + for j in 1:i + expected = nrow(res) <= 10 ? combine(gd, trans[p[j]]) : + # Second operation is there only to generate as many rows as in res + combine(gd, trans[p[j]], y -> (xxx=fill(1000y.id[1],y.id[1]+1),)) + nms = intersect(names(expected), names(res)) + @test res[!, nms] == expected[!, nms] + end + end +end + end # module From a3dd7fd484fcf8c3e7633fc56f38f3ef57ba14dc Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 13 Dec 2020 15:15:23 +0100 Subject: [PATCH 10/22] Add @sync --- src/groupeddataframe/splitapplycombine.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 202d9328e2..844a1a5d22 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -539,7 +539,7 @@ function _combine(gd::GroupedDataFrame, tasks = similar(cs_norm, Task) parentdf = parent(gd) - for i in eachindex(cs_norm, optional_transform, tasks) + @sync for i in eachindex(cs_norm, optional_transform, tasks) cs_i = cs_norm[i] optional_i = optional_transform[i] From 4a12320e7f0676f0e24bc8a4c8bd7c70e2cbb78a Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 20 Dec 2020 12:20:35 +0100 Subject: [PATCH 11/22] Fix exception type --- src/groupeddataframe/splitapplycombine.jl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 844a1a5d22..e41d1fba52 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -539,7 +539,7 @@ function _combine(gd::GroupedDataFrame, tasks = similar(cs_norm, Task) parentdf = parent(gd) - @sync for i in eachindex(cs_norm, optional_transform, tasks) + for i in eachindex(cs_norm, optional_transform, tasks) cs_i = cs_norm[i] optional_i = optional_transform[i] @@ -556,6 +556,16 @@ function _combine(gd::GroupedDataFrame, _combine_process_pair(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) end end + # Workaround JuliaLang/julia#38931: + # we want to preserve the exception type thrown in user code, + # and print the backtrace corresponding to it + for t in tasks + try + wait(t) + catch e + throw(t.exception) + end + end # Post-processing has to be run sequentially # since the order of operations determines that of columns for t in tasks From adf35bf9ac84d70a0d1cfebcd4b9f2d7cf0b5fce Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Mon, 21 Dec 2020 22:48:25 +0100 Subject: [PATCH 12/22] More robust exception handling --- src/groupeddataframe/splitapplycombine.jl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index e41d1fba52..d552ced999 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -563,7 +563,11 @@ function _combine(gd::GroupedDataFrame, try wait(t) catch e - throw(t.exception) + if e isa TaskFailedException + throw(t.exception) + else + rethrow(e) + end end end # Post-processing has to be run sequentially From aed32c46c788f4d507f939549dddd0f559153936 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Mon, 21 Dec 2020 23:22:49 +0100 Subject: [PATCH 13/22] Fix Julia 1.0 --- src/groupeddataframe/splitapplycombine.jl | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index d552ced999..5726b3316f 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -563,10 +563,18 @@ function _combine(gd::GroupedDataFrame, try wait(t) catch e - if e isa TaskFailedException - throw(t.exception) + @static if VERSION > "v1.3" + if e isa TaskFailedException + throw(t.exception) + else + rethrow(e) + end else - rethrow(e) + if e isa ErrorException + throw(t.exception) + else + rethrow(e) + end end end end From e442ce54ad6c6d8b56f02f3e85ae7fbf2d17bb54 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Tue, 22 Dec 2020 11:19:14 +0100 Subject: [PATCH 14/22] Typo --- src/groupeddataframe/splitapplycombine.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 5726b3316f..4414ec43aa 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -563,7 +563,7 @@ function _combine(gd::GroupedDataFrame, try wait(t) catch e - @static if VERSION > "v1.3" + @static if VERSION > v"1.3" if e isa TaskFailedException throw(t.exception) else From 6d96c2a69d21ccfbbb533bc2e1afa1dc48692ac5 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 23 Dec 2020 14:46:06 +0100 Subject: [PATCH 15/22] Add `@spawn` test --- test/utils.jl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/utils.jl b/test/utils.jl index ba1f507bef..5dd3a64623 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -94,4 +94,12 @@ end :sum_skipmissing_div12 end +@testset "pre-Julia 1.3 @spawn replacement" begin + t = @sync DataFrames.@spawn begin + sleep(1) + true + end + @test fetch(t) === true +end + end # module From 38b2511094bdf3bb84dccaf53cacf91637391072 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 16 Jan 2021 19:51:48 +0100 Subject: [PATCH 16/22] Call GC.safepoint every 100_000 rows --- src/groupeddataframe/fastaggregates.jl | 36 +++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 4bce0e6c5f..9d14d7345d 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -163,18 +163,30 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo counts = zeros(Int, n) end groups = gd.groups - @inbounds for i in eachindex(incol, groups) - gix = groups[i] - x = incol[i] - if gix > 0 && (condf === nothing || condf(x)) - # this check should be optimized out if U is not Any - if eltype(res) === Any && !isassigned(res, gix) - res[gix] = f(x, gix) - else - res[gix] = op(res[gix], f(x, gix)) - end - if adjust !== nothing || checkempty - counts[gix] += 1 + if VERSION >= v"1.4" && Threads.nthreads() > 1 + batchsize = 100_000 + else + batchsize = typemax(Int) + end + for batch in Iterators.partition(eachindex(incol, groups), batchsize) + # Allow other tasks to do garbage collection while this one runs + @static if VERSION >= v"1.4" + GC.safepoint() + end + + @inbounds for i in batch + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if U is not Any + if eltype(res) === Any && !isassigned(res, gix) + res[gix] = f(x, gix) + else + res[gix] = op(res[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts[gix] += 1 + end end end end From 542cd1c349c78b2060843f2dc3efe2ddc0d55d6c Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 17 Jan 2021 19:54:23 +0100 Subject: [PATCH 17/22] Review fixes --- src/abstractdataframe/selection.jl | 4 ++++ test/grouping.jl | 29 +++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index e22ca022a4..65fdc6684f 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -143,6 +143,10 @@ const TRANSFORMATION_COMMON_RULES = returned and the same copying rules apply as for a `DataFrame` input: this means in particular that selected columns will be copied. If `copycols=false`, a `SubDataFrame` is returned without copying columns. + + If a `GroupedDataFrame` is passed, a separate task is spawned for each + specified transformation, allowing for parallel operation when several + transformations are requested and Julia was started with more than one thread. """ """ diff --git a/test/grouping.jl b/test/grouping.jl index 169e6a67bd..e4be18df8d 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3274,14 +3274,17 @@ end end @testset "permutations of operations with combine" begin + Random.seed!(1) df = DataFrame(id=rand(1:10, 20)) - gd = groupby(df, :id) + gd = groupby_checked(df, :id) + trans = [:id => (y -> sum(y)) => :v1, :id => (y -> 10maximum(y)) => :v2, - y -> (v3=100y.id[1],), - y -> (v4=fill(1000y.id[1],y.id[1]+1),)] + :id => sum => :v3, + y -> (v4=100y.id[1],), + y -> (v5=fill(1000y.id[1],y.id[1]+1),)] - for p in permutations(1:4), i in 1:4 + for p in permutations(1:length(trans)), i in 1:length(trans) res = combine(gd, trans[p[1:i]]...) for j in 1:i expected = nrow(res) <= 10 ? combine(gd, trans[p[j]]) : @@ -3291,6 +3294,24 @@ end @test res[!, nms] == expected[!, nms] end end + + trans = [:id => (y -> sum(y)) => :v1, + :id => (y -> 10maximum(y)) => :v2, + :id => sum => :v3, + y -> (v4=100y.id[1],), + y -> (v5=1000 .* y.id[1],), + :id => :v6] + + for p in permutations(1:length(trans)), i in 1:length(trans) + res = combine(gd, trans[p[1:i]]...) + for j in 1:i + expected = nrow(res) <= 10 ? combine(gd, trans[p[j]]) : + # Second operation is there only to generate as many rows as in res + combine(gd, trans[p[j]], y -> (xxx=1000 .* y.id,)) + nms = intersect(names(expected), names(res)) + @test res[!, nms] == expected[!, nms] + end + end end end # module From 4ceddf5ff49ded3ea01b9019f3b5d0123954c06f Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 17 Jan 2021 20:08:07 +0100 Subject: [PATCH 18/22] Add mention in the manual --- docs/src/man/split_apply_combine.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/src/man/split_apply_combine.md b/docs/src/man/split_apply_combine.md index 09ab0f6e23..99fa6c7e46 100644 --- a/docs/src/man/split_apply_combine.md +++ b/docs/src/man/split_apply_combine.md @@ -122,6 +122,10 @@ It is allowed to mix single values and vectors if multiple transformations are requested. In this case single value will be repeated to match the length of columns specified by returned vectors. +A separate task is spawned for each specified transformation, allowing for +parallel operation when several transformations are requested and Julia was +started with more than one thread. + To apply `function` to each row instead of whole columns, it can be wrapped in a `ByRow` struct. `cols` can be any column indexing syntax, in which case `function` will be passed one argument for each of the columns specified by @@ -130,7 +134,7 @@ If `ByRow` is used it is allowed for `cols` to select an empty set of columns, in which case `function` is called for each row without any arguments and an empty `NamedTuple` is passed if empty set of columns is wrapped in `AsTable`. -There the following keyword arguments are supported by the transformation functions +The following keyword arguments are supported by the transformation functions (not all keyword arguments are supported in all cases; in general they are allowed in situations when they are meaningful, see the documentation of the specific functions for details): From a0ce3649efa76cf65f12ab3d9c338487c03d514d Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 17 Jan 2021 20:48:28 +0100 Subject: [PATCH 19/22] Fix Julia 1.0 --- src/groupeddataframe/fastaggregates.jl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 9d14d7345d..ec00216732 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -163,12 +163,13 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo counts = zeros(Int, n) end groups = gd.groups - if VERSION >= v"1.4" && Threads.nthreads() > 1 - batchsize = 100_000 + @static if VERSION >= v"1.4" + batchsize = Threads.nthreads() > 1 ? 100_000 : typemax(Int) + batches = Iterators.partition(eachindex(incol, groups), batchsize) else - batchsize = typemax(Int) + batches = (eachindex(incol, groups),) end - for batch in Iterators.partition(eachindex(incol, groups), batchsize) + for batch in batches # Allow other tasks to do garbage collection while this one runs @static if VERSION >= v"1.4" GC.safepoint() From c7b3ceea00632b19bbf3fad73de302d76c56ce72 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 20 Jan 2021 22:31:07 +0100 Subject: [PATCH 20/22] Add comment --- src/groupeddataframe/splitapplycombine.jl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 4414ec43aa..d1f81b932c 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -539,6 +539,11 @@ function _combine(gd::GroupedDataFrame, tasks = similar(cs_norm, Task) parentdf = parent(gd) + # Operations are run in separate tasks, except two parts: + # - the first task that needs idx_agg computes it; + # a lock ensures others wait for it to complete + # - once all tasks are done, they need sequential postprocessing + # since their order affects that of columns for i in eachindex(cs_norm, optional_transform, tasks) cs_i = cs_norm[i] optional_i = optional_transform[i] From e1a0222606ed0fa72f6199a53922cd871da76a86 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 20 Jan 2021 22:43:41 +0100 Subject: [PATCH 21/22] Improve docs --- docs/src/man/split_apply_combine.md | 4 +++- src/abstractdataframe/selection.jl | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/src/man/split_apply_combine.md b/docs/src/man/split_apply_combine.md index 99fa6c7e46..b70509d66c 100644 --- a/docs/src/man/split_apply_combine.md +++ b/docs/src/man/split_apply_combine.md @@ -124,7 +124,9 @@ of columns specified by returned vectors. A separate task is spawned for each specified transformation, allowing for parallel operation when several transformations are requested and Julia was -started with more than one thread. +started with more than one thread. Passed transformation functions should +therefore not modify global variables (i.e. they should be pure), or use +locks to control parallel accesses. To apply `function` to each row instead of whole columns, it can be wrapped in a `ByRow` struct. `cols` can be any column indexing syntax, in which case diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 55c7bbb14d..9f2eb701a7 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -147,6 +147,10 @@ const TRANSFORMATION_COMMON_RULES = If a `GroupedDataFrame` is passed, a separate task is spawned for each specified transformation, allowing for parallel operation when several transformations are requested and Julia was started with more than one thread. + Passed transformation functions should therefore not modify global variables + (i.e. they should be pure), or use locks to control parallel accesses. + In the future, parallelism may be extended to other cases, so this requirement + also holds for `DataFrame` inputs. """ """ From 76a7a21b88b9da6422a73b340584a43976a87720 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 20 Jan 2021 22:51:00 +0100 Subject: [PATCH 22/22] Use multiple threads on CI, warn if not --- .github/workflows/ci.yml | 2 ++ test/runtests.jl | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e06faac016..469634233f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,8 @@ jobs: ${{ runner.os }}- - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + env: + JULIA_NUM_THREADS: 4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v1 with: diff --git a/test/runtests.jl b/test/runtests.jl index fec6c8d6e6..ca9e1fd87e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,6 +8,10 @@ anyerrors = false using DataFrames, Dates, Test, Random +if VERSION > v"1.3" && Threads.nthreads() < 2 + @warn("Running with only one thread: correctness of parallel operations is not tested") +end + my_tests = ["utils.jl", "cat.jl", "data.jl",