Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multithreading with several operations in combine/select/transform #2574

Merged
merged 23 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Tables = "1.2"
julia = "1"

[extras]
Combinatorics = "861a8166-3701-5b0c-9a16-15d98fcdc6aa"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
DataValues = "e7dc6d0d-1eca-5fa6-8ad6-5aecde8b7ea5"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand All @@ -47,4 +48,4 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["DataStructures", "DataValues", "Dates", "Logging", "Random", "Test"]
test = ["Combinatorics", "DataStructures", "DataValues", "Dates", "Logging", "Random", "Test"]
6 changes: 5 additions & 1 deletion docs/src/man/split_apply_combine.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ It is allowed to mix single values and vectors if multiple transformations
are requested. In this case single value will be repeated to match the length
of columns specified by returned vectors.

A separate task is spawned for each specified transformation, allowing for
parallel operation when several transformations are requested and Julia was
started with more than one thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add that this means that the transformation functions passed should not modify the same state of the Julia program?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've also mentioned that this may be extended in the future so that people are not caught by surprise.


To apply `function` to each row instead of whole columns, it can be wrapped in a
`ByRow` struct. `cols` can be any column indexing syntax, in which case
`function` will be passed one argument for each of the columns specified by
Expand All @@ -130,7 +134,7 @@ If `ByRow` is used it is allowed for `cols` to select an empty set of columns,
in which case `function` is called for each row without any arguments and an
empty `NamedTuple` is passed if empty set of columns is wrapped in `AsTable`.

There the following keyword arguments are supported by the transformation functions
The following keyword arguments are supported by the transformation functions
(not all keyword arguments are supported in all cases; in general they are allowed
in situations when they are meaningful, see the documentation of the specific functions
for details):
Expand Down
19 changes: 18 additions & 1 deletion src/DataFrames.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module DataFrames
using Statistics, Printf, REPL
using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays
@reexport using Missings, InvertedIndices
using Base.Sort, Base.Order, Base.Iterators
using Base.Sort, Base.Order, Base.Iterators, Base.Threads
using TableTraits, IteratorInterfaceExtensions
import LinearAlgebra: norm
using Markdown
Expand Down Expand Up @@ -93,6 +93,23 @@ else
export only
end

if VERSION >= v"1.3"
using Base.Threads: @spawn
else
# This is the definition of @async in Base
macro spawn(expr)
thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
local task = Task($thunk)
if $(Expr(:isdefined, var))
push!($var, task)
end
schedule(task)
end
end
end

include("other/utils.jl")
include("other/index.jl")

Expand Down
4 changes: 4 additions & 0 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ const TRANSFORMATION_COMMON_RULES =
returned and the same copying rules apply as for a `DataFrame` input: this
means in particular that selected columns will be copied. If
`copycols=false`, a `SubDataFrame` is returned without copying columns.

If a `GroupedDataFrame` is passed, a separate task is spawned for each
specified transformation, allowing for parallel operation when several
transformations are requested and Julia was started with more than one thread.
"""

"""
Expand Down
37 changes: 25 additions & 12 deletions src/groupeddataframe/fastaggregates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,31 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo
counts = zeros(Int, n)
end
groups = gd.groups
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts[gix] += 1
@static if VERSION >= v"1.4"
batchsize = Threads.nthreads() > 1 ? 100_000 : typemax(Int)
batches = Iterators.partition(eachindex(incol, groups), batchsize)
else
batches = (eachindex(incol, groups),)
end
for batch in batches
# Allow other tasks to do garbage collection while this one runs
@static if VERSION >= v"1.4"
GC.safepoint()
end

@inbounds for i in batch
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts[gix] += 1
end
end
end
end
Expand Down
Loading