From e4ff840b2219d9213d019987cf5ddade2f75059b Mon Sep 17 00:00:00 2001 From: Joao Aparicio Date: Sat, 1 Apr 2023 17:38:21 -0500 Subject: [PATCH 1/4] Add kwarg to filter columns Currently we don't have the option to load just a subset of the columns. This matters e.g. when compression is the bottleneck. For example, create a compressed arrow file. ```julia using Arrow p = tempname(); N = 1000000 tbl = ( a=rand(N), b=rand(N), c=rand(N), d=rand(N), e=rand(N), f=[rand(rand(0:100)) for _ in 1:N], ); Arrow.write(p, tbl; compress=:zstd); ``` Column `f` is the longest - it has an expected 50*N elements vs N for the rest Some times we only care for some of the other columns. Currently we must decompress all columns regardless: ```julia using BenchmarkTools @btime tbl = Arrow.Table(p); # 359.205 ms (530 allocations: 794.23 MiB) ``` With this commit we can load only some of the columns ```julia @btime tbl = Arrow.Table(p; filtercolumns=["a"]); # 6.146 ms (231 allocations: 14.33 MiB) ``` --- src/table.jl | 132 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 5 deletions(-) diff --git a/src/table.jl b/src/table.jl index b1695e9b..1a0b1b38 100644 --- a/src/table.jl +++ b/src/table.jl @@ -66,9 +66,14 @@ mutable struct Stream dictencoded::Dict{Int64, Meta.Field} # dictionary id => field convert::Bool compression::Ref{Union{Symbol,Nothing}} + filtercolumns::Union{Nothing,Vector{String}} end -function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true) +function Stream( + inputs::Vector{ArrowBlob}; + convert::Bool=true, + filtercolumns::Union{Nothing,Vector{String}}=nothing, +) inputindex = 1 batchiterator = nothing names = Symbol[] @@ -77,7 +82,7 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true) dictencodings = Dict{Int64, DictEncoding}() dictencoded = Dict{Int64, Meta.Field}() compression = Ref{Union{Symbol,Nothing}}(nothing) - Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression) + Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression, filtercolumns) end Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...) @@ -152,6 +157,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) # assert endianness? # store custom_metadata? for (i, field) in enumerate(x.schema.fields) + isnothing(x.filtercolumns) || field.name in x.filtercolumns || continue push!(x.names, Symbol(field.name)) push!(x.types, juliaeltype(field, buildmetadata(field.custom_metadata), x.convert)) # recursively find any dictionaries for any fields @@ -188,7 +194,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) if header.compression !== nothing compression = header.compression end - for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert) + for vec in vectoriterator(x.schema, batch, x.dictencodings, x.convert; x.filtercolumns) push!(columns, vec) end break @@ -210,6 +216,7 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) lookup = Dict{Symbol, AbstractVector}() types = Type[] for (nm, col) in zip(x.names, columns) + isnothing(x.filtercolumns) || String(nm) in x.filtercolumns || continue lookup[nm] = col push!(types, eltype(col)) end @@ -292,7 +299,11 @@ Table(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Table([ArrowBl Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...) # will detect whether we're reading a Table from a file or stream -function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) +function Table( + blobs::Vector{ArrowBlob}; + convert::Bool=true, + filtercolumns::Union{Nothing,Vector{String}}=nothing, +) t = Table() sch = nothing dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding @@ -334,6 +345,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) # store custom_metadata? if sch === nothing for (i, field) in enumerate(header.fields) + isnothing(filtercolumns) || field.name in filtercolumns || continue push!(names(t), Symbol(field.name)) # recursively find any dictionaries for any fields getdictionaries!(dictencoded, field) @@ -351,6 +363,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) if haskey(dictencodings, id) && header.isDelta # delta field = dictencoded[id] + isnothing(filtercolumns) || field.name in filtercolumns || continue values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert) dictencoding = dictencodings[id] if typeof(dictencoding.data) <: ChainedVector @@ -373,7 +386,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) anyrecordbatches = true @debugv 1 "parsing record batch message: compression = $(header.compression)" Threads.@spawn begin - cols = collect(VectorIterator(sch, $batch, dictencodings, convert)) + cols = collect(vectoriterator(sch, $batch, dictencodings, convert; filtercolumns)) put!(() -> put!(tsks, cols), sync, $(rbi)) end rbi += 1 @@ -389,11 +402,13 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) # 158; some implementations may send 0 record batches if !anyrecordbatches && !isnothing(sch) for field in sch.fields + isnothing(filtercolumns) || field.name in filtercolumns || continue T = juliaeltype(field, buildmetadata(field), convert) push!(columns(t), T[]) end end for (nm, col) in zip(names(t), columns(t)) + isnothing(filtercolumns) || String(nm) in filtercolumns || continue lu[nm] = col push!(ty, eltype(col)) end @@ -455,6 +470,14 @@ function Base.iterate(x::BatchIterator, (pos, id)=(x.startpos, 0)) return Batch(msg, x.bytes, pos, id), (pos + msg.bodyLength, id + 1) end +function vectoriterator(schema, batch, de, convert; filtercolumns=nothing) + ( + isnothing(filtercolumns) ? + VectorIterator(schema, batch, de, convert) : + FilteredVectorIterator(schema, batch, de, convert, filtercolumns) + ) +end + struct VectorIterator schema::Meta.Schema batch::Batch # batch.msg.header MUST BE RecordBatch @@ -479,6 +502,42 @@ end Base.length(x::VectorIterator) = length(x.schema.fields) +struct FilteredVectorIterator + schema::Meta.Schema + batch::Batch # batch.msg.header MUST BE RecordBatch + dictencodings::Dict{Int64, DictEncoding} + convert::Bool + filtercolumns::Vector{String} +end + +Base.IteratorSize(::Type{FilteredVectorIterator}) = Base.SizeUnknown() + +function _nextstate(x::FilteredVectorIterator, state) + columnidx, nodeidx, bufferidx = state + columnidx > length(x.schema.fields) && return + field = x.schema.fields[columnidx] + while !(field.name in x.filtercolumns) + nodeidx, bufferidx = Arrow._step(field, nodeidx, bufferidx) + columnidx += 1 + columnidx > length(x.schema.fields) && return + field = x.schema.fields[columnidx] + end + (columnidx, nodeidx, bufferidx) +end + +function Base.iterate(x::FilteredVectorIterator, state=(Int64(1), Int64(1), Int64(1))) + state = _nextstate(x, state) + isnothing(state) && return + (columnidx, nodeidx, bufferidx) = state + field = x.schema.fields[columnidx] + @debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" + A, nodeidx, bufferidx = build(field, x.batch, x.batch.msg.header, x.dictencodings, nodeidx, bufferidx, x.convert) + @debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" + @debugv 3 A + columnidx += 1 + return A, (columnidx, nodeidx, bufferidx) +end + const ListTypes = Union{Meta.Utf8, Meta.LargeUtf8, Meta.Binary, Meta.LargeBinary, Meta.List, Meta.LargeList} const LargeLists = Union{Meta.LargeUtf8, Meta.LargeBinary, Meta.LargeList} @@ -715,3 +774,66 @@ function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, c T = juliaeltype(f, meta, convert) return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1 end + +_step(::Meta.Field, ::L, nodeidx, bufferidx) where {L} = nodeidx + 1, bufferidx + 2 +_step(::Meta.Field, ::Meta.Bool, nodeidx, bufferidx) = nodeidx + 1, bufferidx + 2 + +function _step(field::Meta.Field, nodeidx, bufferidx) + if field.dictionary !== nothing + return nodeidx+1, bufferidx+2 + else + return _step(field, field.type, nodeidx, bufferidx) + end +end + +function _step(f::Meta.Field, L::ListTypes, nodeidx, bufferidx) + bufferidx += 2 + nodeidx += 1 + if L isa Meta.Utf8 || L isa Meta.LargeUtf8 || L isa Meta.Binary || L isa Meta.LargeBinary + bufferidx += 1 + else + nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +function _step(f::Meta.Field, L::Union{Meta.FixedSizeBinary, Meta.FixedSizeList}, nodeidx, bufferidx) + bufferidx += 1 + nodeidx += 1 + if L isa Meta.FixedSizeBinary + bufferidx += 1 + else + nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +function _step(f::Meta.Field, ::Meta.Map, nodeidx, bufferidx) + bufferidx += 2 + nodeidx += 1 + nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) + nodeidx, bufferidx +end + +function _step(f::Meta.Field, ::Meta.Struct, nodeidx, bufferidx) + bufferidx += 1 + nodeidx += 1 + for child in f.children + nodeidx, bufferidx = _step(child, nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +function _step(f::Meta.Field, L::Meta.Union, nodeidx, bufferidx) + bufferidx += 1 + if L.mode == Meta.UnionModes.Dense + bufferidx += 1 + end + nodeidx += 1 + for child in f.children + nodeidx, bufferidx = _step(child, nodeidx, bufferidx) + end + nodeidx, bufferidx +end + +_step(::Meta.Field, ::Meta.Null, nodeidx, bufferidx) = nodeidx + 1, bufferidx \ No newline at end of file From 9322cd5cd90212f92083d1e8ab0c4c4705039380 Mon Sep 17 00:00:00 2001 From: Joao Aparicio Date: Fri, 3 Nov 2023 17:44:00 -0500 Subject: [PATCH 2/4] format --- src/table.jl | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/src/table.jl b/src/table.jl index a7a0b885..49b44c59 100644 --- a/src/table.jl +++ b/src/table.jl @@ -262,7 +262,8 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) if header.compression !== nothing compression = header.compression end - for vec in vectoriterator(x.schema, batch, x.dictencodings, x.convert; x.filtercolumns) + for vec in + vectoriterator(x.schema, batch, x.dictencodings, x.convert; x.filtercolumns) push!(columns, vec) end break @@ -537,7 +538,9 @@ function Table( anyrecordbatches = true @debugv 1 "parsing record batch message: compression = $(header.compression)" @wkspawn begin - cols = collect(vectoriterator(sch, $batch, dictencodings, convert; filtercolumns)) + cols = collect( + vectoriterator(sch, $batch, dictencodings, convert; filtercolumns), + ) put!(() -> put!(tsks, cols), sync, $(rbi)) end rbi += 1 @@ -625,8 +628,7 @@ end function vectoriterator(schema, batch, de, convert; filtercolumns=nothing) ( - isnothing(filtercolumns) ? - VectorIterator(schema, batch, de, convert) : + isnothing(filtercolumns) ? VectorIterator(schema, batch, de, convert) : FilteredVectorIterator(schema, batch, de, convert, filtercolumns) ) end @@ -669,7 +671,7 @@ Base.length(x::VectorIterator) = length(x.schema.fields) struct FilteredVectorIterator schema::Meta.Schema batch::Batch # batch.msg.header MUST BE RecordBatch - dictencodings::Dict{Int64, DictEncoding} + dictencodings::Dict{Int64,DictEncoding} convert::Bool filtercolumns::Vector{String} end @@ -695,7 +697,15 @@ function Base.iterate(x::FilteredVectorIterator, state=(Int64(1), Int64(1), Int6 (columnidx, nodeidx, bufferidx) = state field = x.schema.fields[columnidx] @debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" - A, nodeidx, bufferidx = build(field, x.batch, x.batch.msg.header, x.dictencodings, nodeidx, bufferidx, x.convert) + A, nodeidx, bufferidx = build( + field, + x.batch, + x.batch.msg.header, + x.dictencodings, + nodeidx, + bufferidx, + x.convert, + ) @debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" @debugv 3 A columnidx += 1 @@ -990,7 +1000,7 @@ _step(::Meta.Field, ::Meta.Bool, nodeidx, bufferidx) = nodeidx + 1, bufferidx + function _step(field::Meta.Field, nodeidx, bufferidx) if field.dictionary !== nothing - return nodeidx+1, bufferidx+2 + return nodeidx + 1, bufferidx + 2 else return _step(field, field.type, nodeidx, bufferidx) end @@ -999,7 +1009,10 @@ end function _step(f::Meta.Field, L::ListTypes, nodeidx, bufferidx) bufferidx += 2 nodeidx += 1 - if L isa Meta.Utf8 || L isa Meta.LargeUtf8 || L isa Meta.Binary || L isa Meta.LargeBinary + if L isa Meta.Utf8 || + L isa Meta.LargeUtf8 || + L isa Meta.Binary || + L isa Meta.LargeBinary bufferidx += 1 else nodeidx, bufferidx = _step(f.children[1], nodeidx, bufferidx) @@ -1007,7 +1020,12 @@ function _step(f::Meta.Field, L::ListTypes, nodeidx, bufferidx) nodeidx, bufferidx end -function _step(f::Meta.Field, L::Union{Meta.FixedSizeBinary, Meta.FixedSizeList}, nodeidx, bufferidx) +function _step( + f::Meta.Field, + L::Union{Meta.FixedSizeBinary,Meta.FixedSizeList}, + nodeidx, + bufferidx, +) bufferidx += 1 nodeidx += 1 if L isa Meta.FixedSizeBinary From 7f3ef776a808b3c2d54713c87a532b02c5c18993 Mon Sep 17 00:00:00 2001 From: Joao Aparicio Date: Fri, 3 Nov 2023 18:05:41 -0500 Subject: [PATCH 3/4] typo --- src/table.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/table.jl b/src/table.jl index 49b44c59..71bb7eb5 100644 --- a/src/table.jl +++ b/src/table.jl @@ -101,7 +101,7 @@ function Stream( dictencoded, convert, compression, - filtercolumnns, + filtercolumns, ) end From bc9169edf9e48409f8023c854e1b6d45d780008b Mon Sep 17 00:00:00 2001 From: Joao Aparicio Date: Fri, 3 Nov 2023 19:02:17 -0500 Subject: [PATCH 4/4] update manual --- docs/src/manual.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/src/manual.md b/docs/src/manual.md index cb7e433b..158fc343 100644 --- a/docs/src/manual.md +++ b/docs/src/manual.md @@ -46,6 +46,14 @@ using Arrow table = Arrow.Table("data.arrow") ``` +Optionally the `filtercolumns` keyword argument will select which columns to load: + +``` +table = Arrow.Table("data.arrow"; filtercolumns = ["col1", "col3"]) +``` + +This can be a significant performance improvement when the data is compressed and some columns are not needed. + ### `Arrow.Table` The type of `table` in this example will be an `Arrow.Table`. When "reading" the arrow data, `Arrow.Table` first ["mmapped"](https://en.wikipedia.org/wiki/Mmap) the `data.arrow` file, which is an important technique for dealing with data larger than available RAM on a system. By "mmapping" a file, the OS doesn't actually load the entire file contents into RAM at the same time, but file contents are "swapped" into RAM as different regions of a file are requested. Once "mmapped", `Arrow.Table` then inspected the metadata in the file to determine the number of columns, their names and types, at which byte offset each column begins in the file data, and even how many "batches" are included in this file (arrow tables may be partitioned into one or more "record batches" each containing portions of the data). Armed with all the appropriate metadata, `Arrow.Table` then created custom array objects ([`Arrow.ArrowVector`](@ref)), which act as "views" into the raw arrow memory bytes. This is a significant point in that no extra memory is allocated for "data" when reading arrow data. This is in contrast to if we wanted to read data from a csv file as columns into Julia structures; we would need to allocate those array structures ourselves, then parse the file, "filling in" each element of the array with the data we parsed from the file. Arrow data, on the other hand, is *already laid out in memory or on disk* in a binary format, and as long as we have the metadata to interpret the raw bytes, we can figure out whether to treat those bytes as a `Vector{Float64}`, etc. A sample of the kinds of arrow array types you might see when deserializing arrow data, include: