Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kwarg to filter columns #412

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/src/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ using Arrow
table = Arrow.Table("data.arrow")
```

Optionally the `filtercolumns` keyword argument will select which columns to load:

```
table = Arrow.Table("data.arrow"; filtercolumns = ["col1", "col3"])
```

This can be a significant performance improvement when the data is compressed and some columns are not needed.

### `Arrow.Table`

The type of `table` in this example will be an `Arrow.Table`. When "reading" the arrow data, `Arrow.Table` first ["mmapped"](https://en.wikipedia.org/wiki/Mmap) the `data.arrow` file, which is an important technique for dealing with data larger than available RAM on a system. By "mmapping" a file, the OS doesn't actually load the entire file contents into RAM at the same time, but file contents are "swapped" into RAM as different regions of a file are requested. Once "mmapped", `Arrow.Table` then inspected the metadata in the file to determine the number of columns, their names and types, at which byte offset each column begins in the file data, and even how many "batches" are included in this file (arrow tables may be partitioned into one or more "record batches" each containing portions of the data). Armed with all the appropriate metadata, `Arrow.Table` then created custom array objects ([`Arrow.ArrowVector`](@ref)), which act as "views" into the raw arrow memory bytes. This is a significant point in that no extra memory is allocated for "data" when reading arrow data. This is in contrast to if we wanted to read data from a csv file as columns into Julia structures; we would need to allocate those array structures ourselves, then parse the file, "filling in" each element of the array with the data we parsed from the file. Arrow data, on the other hand, is *already laid out in memory or on disk* in a binary format, and as long as we have the metadata to interpret the raw bytes, we can figure out whether to treat those bytes as a `Vector{Float64}`, etc. A sample of the kinds of arrow array types you might see when deserializing arrow data, include:
Expand Down
149 changes: 145 additions & 4 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,14 @@ mutable struct Stream
dictencoded::Dict{Int64,Meta.Field} # dictionary id => field
convert::Bool
compression::Ref{Union{Symbol,Nothing}}
filtercolumns::Union{Nothing,Vector{String}}
end

function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
function Stream(
inputs::Vector{ArrowBlob};
convert::Bool=true,
filtercolumns::Union{Nothing,Vector{String}}=nothing,
)
inputindex = 1
batchiterator = nothing
names = Symbol[]
Expand All @@ -96,6 +101,7 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
dictencoded,
convert,
compression,
filtercolumns,
)
end

Expand Down Expand Up @@ -187,6 +193,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
# assert endianness?
# store custom_metadata?
for (i, field) in enumerate(x.schema.fields)
isnothing(x.filtercolumns) || field.name in x.filtercolumns || continue
push!(x.names, Symbol(field.name))
push!(
x.types,
Expand Down Expand Up @@ -255,7 +262,8 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
if header.compression !== nothing
compression = header.compression
end
for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert)
for vec in
vectoriterator(x.schema, batch, x.dictencodings, x.convert; x.filtercolumns)
push!(columns, vec)
end
break
Expand All @@ -277,6 +285,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
lookup = Dict{Symbol,AbstractVector}()
types = Type[]
for (nm, col) in zip(x.names, columns)
isnothing(x.filtercolumns) || String(nm) in x.filtercolumns || continue
lookup[nm] = col
push!(types, eltype(col))
end
Expand Down Expand Up @@ -412,7 +421,11 @@ Table(inputs::Vector; kw...) =
Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)

# will detect whether we're reading a Table from a file or stream
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
function Table(
blobs::Vector{ArrowBlob};
convert::Bool=true,
filtercolumns::Union{Nothing,Vector{String}}=nothing,
)
t = Table()
sch = nothing
dictencodings = Dict{Int64,DictEncoding}() # dictionary id => DictEncoding
Expand Down Expand Up @@ -448,6 +461,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
# store custom_metadata?
if sch === nothing
for (i, field) in enumerate(header.fields)
isnothing(filtercolumns) || field.name in filtercolumns || continue
push!(names(t), Symbol(field.name))
# recursively find any dictionaries for any fields
getdictionaries!(dictencoded, field)
Expand All @@ -469,6 +483,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
if haskey(dictencodings, id) && header.isDelta
# delta
field = dictencoded[id]
isnothing(filtercolumns) || field.name in filtercolumns || continue
values, _, _ = build(
field,
field.type,
Expand Down Expand Up @@ -523,7 +538,9 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
anyrecordbatches = true
@debugv 1 "parsing record batch message: compression = $(header.compression)"
@wkspawn begin
cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
cols = collect(
vectoriterator(sch, $batch, dictencodings, convert; filtercolumns),
)
put!(() -> put!(tsks, cols), sync, $(rbi))
end
rbi += 1
Expand All @@ -539,11 +556,13 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
# 158; some implementations may send 0 record batches
if !anyrecordbatches && !isnothing(sch)
for field in sch.fields
isnothing(filtercolumns) || field.name in filtercolumns || continue
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
end
end
for (nm, col) in zip(names(t), columns(t))
isnothing(filtercolumns) || String(nm) in filtercolumns || continue
lu[nm] = col
push!(ty, eltype(col))
end
Expand Down Expand Up @@ -607,6 +626,13 @@ function Base.iterate(x::BatchIterator, (pos, id)=(x.startpos, 0))
return Batch(msg, x.bytes, pos, id), (pos + msg.bodyLength, id + 1)
end

function vectoriterator(schema, batch, de, convert; filtercolumns=nothing)
(
isnothing(filtercolumns) ? VectorIterator(schema, batch, de, convert) :
FilteredVectorIterator(schema, batch, de, convert, filtercolumns)
)
end

struct VectorIterator
schema::Meta.Schema
batch::Batch # batch.msg.header MUST BE RecordBatch
Expand Down Expand Up @@ -642,6 +668,50 @@ end

Base.length(x::VectorIterator) = length(x.schema.fields)

struct FilteredVectorIterator
schema::Meta.Schema
batch::Batch # batch.msg.header MUST BE RecordBatch
dictencodings::Dict{Int64,DictEncoding}
convert::Bool
filtercolumns::Vector{String}
end

Base.IteratorSize(::Type{FilteredVectorIterator}) = Base.SizeUnknown()

function _nextstate(x::FilteredVectorIterator, state)
columnidx, nodeidx, bufferidx = state
columnidx > length(x.schema.fields) && return
field = x.schema.fields[columnidx]
while !(field.name in x.filtercolumns)
nodeidx, bufferidx = Arrow._step(field, nodeidx, bufferidx)
columnidx += 1
columnidx > length(x.schema.fields) && return
field = x.schema.fields[columnidx]
end
(columnidx, nodeidx, bufferidx)
end

function Base.iterate(x::FilteredVectorIterator, state=(Int64(1), Int64(1), Int64(1)))
state = _nextstate(x, state)
isnothing(state) && return
(columnidx, nodeidx, bufferidx) = state
field = x.schema.fields[columnidx]
@debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
A, nodeidx, bufferidx = build(
field,
x.batch,
x.batch.msg.header,
x.dictencodings,
nodeidx,
bufferidx,
x.convert,
)
@debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
@debugv 3 A
columnidx += 1
return A, (columnidx, nodeidx, bufferidx)
end

const ListTypes =
Union{Meta.Utf8,Meta.LargeUtf8,Meta.Binary,Meta.LargeBinary,Meta.List,Meta.LargeList}
const LargeLists = Union{Meta.LargeUtf8,Meta.LargeBinary,Meta.LargeList}
Expand Down Expand Up @@ -924,3 +994,74 @@ function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, c
T = juliaeltype(f, meta, convert)
return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1
end

_step(::Meta.Field, ::L, nodeidx, bufferidx) where {L} = nodeidx + 1, bufferidx + 2
_step(::Meta.Field, ::Meta.Bool, nodeidx, bufferidx) = nodeidx + 1, bufferidx + 2

function _step(field::Meta.Field, nodeidx, bufferidx)
if field.dictionary !== nothing
return nodeidx + 1, bufferidx + 2
else
return _step(field, field.type, nodeidx, bufferidx)
end
end

function _step(f::Meta.Field, L::ListTypes, nodeidx, bufferidx)
bufferidx += 2
nodeidx += 1
if L isa Meta.Utf8 ||
L isa Meta.LargeUtf8 ||
L isa Meta.Binary ||
L isa Meta.LargeBinary
bufferidx += 1
else
nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx)
end
nodeidx, bufferidx
end

function _step(
f::Meta.Field,
L::Union{Meta.FixedSizeBinary,Meta.FixedSizeList},
nodeidx,
bufferidx,
)
bufferidx += 1
nodeidx += 1
if L isa Meta.FixedSizeBinary
bufferidx += 1
else
nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx)
end
nodeidx, bufferidx
end

function _step(f::Meta.Field, ::Meta.Map, nodeidx, bufferidx)
bufferidx += 2
nodeidx += 1
nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx)
nodeidx, bufferidx
end

function _step(f::Meta.Field, ::Meta.Struct, nodeidx, bufferidx)
bufferidx += 1
nodeidx += 1
for child in f.children
nodeidx, bufferidx = _step(child, nodeidx, bufferidx)
end
nodeidx, bufferidx
end

function _step(f::Meta.Field, L::Meta.Union, nodeidx, bufferidx)
bufferidx += 1
if L.mode == Meta.UnionModes.Dense
bufferidx += 1
end
nodeidx += 1
for child in f.children
nodeidx, bufferidx = _step(child, nodeidx, bufferidx)
end
nodeidx, bufferidx
end

_step(::Meta.Field, ::Meta.Null, nodeidx, bufferidx) = nodeidx + 1, bufferidx