Skip to content

Commit

Permalink
Deal with encoding within each page (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
Moelf authored Mar 16, 2024
1 parent 2d2acf9 commit 63c64cf
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 83 deletions.
113 changes: 74 additions & 39 deletions src/RNTuple/fieldcolumn_reading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ end

_field_output_type(::Type{StringField{O, T}}) where {O, T} = Vector{String}
function read_field(io, field::StringField{O, T}, page_list) where {O, T}
nbits = field.content_col.columnrecord.nbits
cr = field.content_col.columnrecord
pages = page_list[field.content_col.content_col_idx]

offset = read_field(io, field.offset_col, page_list)
content = read_pagedesc(io, pages, nbits)
content = read_pagedesc(io, pages, cr)

o = one(eltype(offset))
jloffset = pushfirst!(offset .+ o, o) #change to 1-indexed, and add a 1 at the beginning
res = String.(VectorOfVectors(content, jloffset, ArraysOfArrays.no_consistency_checks))
res = String.(VectorOfVectors(content, jloffset))
return res::_field_output_type(field)
end

Expand All @@ -65,16 +65,10 @@ end

_field_output_type(::Type{RNTupleCardinality{T}}) where {T} = CardinalityVector{T}
function read_field(io, field::RNTupleCardinality{T}, page_list) where T
nbits = field.leaf_field.columnrecord.nbits
cr = field.leaf_field.columnrecord
pages = page_list[field.leaf_field.content_col_idx]
typenum = field.leaf_field.columnrecord.type
split = 14 <= typenum <= 21 || 26 <= typenum <= 28
delta = 14 <= typenum <= 15
bytes = read_pagedesc(io, pages, nbits; split)
bytes = read_pagedesc(io, pages, cr)
contents = reinterpret(T, bytes)
if delta
cumsum!(contents, contents)
end
res = CardinalityVector(contents)
return res::_field_output_type(field)
end
Expand All @@ -95,46 +89,22 @@ function _to_zigzag!(res::AbstractVector)
return res
end

function _reset_to_incremental(res::AbstractVector, pages, ::Type{T}) where T
endpoint = 0
for pi in firstindex(pages):lastindex(pages)-1
endpoint += pages[pi].num_elements
res[endpoint+1] -= sum(@view res[begin:endpoint])
end
end

_field_output_type(::Type{LeafField{T}}) where {T} = Vector{T}
function read_field(io, field::LeafField{T}, page_list) where T
nbits = field.columnrecord.nbits
cr = field.columnrecord
pages = page_list[field.content_col_idx]
# handle split encoding within page
typenum = field.columnrecord.type
split = 14 <= typenum <= 21 || 26 <= typenum <= 28
zigzag = 26 <= typenum <= 28
delta = 14 <= typenum <= 15
bytes = read_pagedesc(io, pages, nbits; split = split)
res = collect(reinterpret(T, bytes))
if zigzag
_from_zigzag!(res)
elseif delta
# the Index32/64 resets to absolute offset page-by-page
# https://github.com/JuliaHEP/UnROOT.jl/issues/312#issuecomment-1999875348
if T <: Union{Index32, Index64} && length(pages) > 1
_reset_to_incremental(res, pages, T)
end
cumsum!(res, res)
end
res = collect(reinterpret(T, read_pagedesc(io, pages, cr)))
return res::_field_output_type(field)
end

_field_output_type(::Type{LeafField{Bool}}) = BitVector
function read_field(io, field::LeafField{Bool}, page_list)
nbits = field.columnrecord.nbits
cr = field.columnrecord
pages = page_list[field.content_col_idx]
total_num_elements = sum(p.num_elements for p in pages)

# pad to nearest 8*k bytes because each chunk needs to be UInt64
bytes = read_pagedesc(io, pages, nbits)
bytes = read_pagedesc(io, pages, cr)
append!(bytes, zeros(eltype(bytes), 8 - rem(total_num_elements, 8)))
chunks = reinterpret(UInt64, bytes)

Expand Down Expand Up @@ -205,3 +175,68 @@ function read_field(io, field::UnionField{S, T}, page_list) where {S, T}
res = UnionVector(_split_switch_bits(switch)..., content)
return res::_field_output_type(field)
end

function _detect_encoding(typenum)
split = 14 <= typenum <= 21 || 26 <= typenum <= 28
zigzag = 26 <= typenum <= 28
delta = 14 <= typenum <= 15
return split, zigzag, delta
end

"""
read_pagedesc(io, pagedescs::AbstractVector{PageDescription}, cr::ColumnRecord)
Read the decompressed raw bytes given a Page Description. The
`nbits` need to be provided according to the element type of the
column since `pagedesc` only contains `num_elements` information.
!!! note
We handle split, zigzag, and delta encodings inside this function.
"""
function read_pagedesc(io, pagedescs::AbstractVector{PageDescription}, cr::ColumnRecord)
nbits = cr.nbits
split, zigzag, delta = _detect_encoding(cr.type)
output_L = div(sum(p.num_elements for p in pagedescs; init=UInt32(0))*nbits, 8, RoundUp)
res = Vector{UInt8}(undef, output_L)

# a page max size is 64KB
tmp = Vector{UInt8}(undef, 65536)

tip = 1
for i in eachindex(pagedescs)
pagedesc = pagedescs[i]
# when nbits == 1 for bits, need RoundUp
uncomp_size = div(pagedesc.num_elements * nbits, 8, RoundUp)
dst = @view res[tip:tip+uncomp_size-1]
_read_locator!(tmp, io, pagedesc.locator, uncomp_size)
if split
if nbits == 16
split2_reinterpret!(dst, tmp)
elseif nbits == 32
split4_reinterpret!(dst, tmp)
elseif nbits == 64
split8_reinterpret!(dst, tmp)
end
else
dst .= tmp
end

shim = if nbits == 16
reinterpret(Int16, dst)
elseif nbits == 32
reinterpret(Int32, dst)
elseif nbits == 64
reinterpret(Int64, dst)
end

if delta
cumsum!(shim, shim)
elseif zigzag
_from_zigzag!(shim)
end

tip += uncomp_size
end

return res
end
44 changes: 0 additions & 44 deletions src/RNTuple/footer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,50 +95,6 @@ function split8_reinterpret!(dst, src::Vector{UInt8})
return dst
end

"""
read_pagedesc(io, pagedescs::AbstractVector{PageDescription}, nbits::Integer; split=false)
Read the decompressed raw bytes given a Page Description. The
`nbits` need to be provided according to the element type of the
column since `pagedesc` only contains `num_elements` information.
`split` is true when split encoding is needed, this is done per page.
!!! note
Boolean values are always stored as bit in RNTuple, so `nbits = 1`.
"""
function read_pagedesc(io, pagedescs::AbstractVector{PageDescription}, nbits::Integer; split=false)
output_L = div(sum(p.num_elements for p in pagedescs; init=UInt32(0))*nbits, 8, RoundUp)
res = Vector{UInt8}(undef, output_L)

# a page max size is 64KB
tmp = Vector{UInt8}(undef, 65536)

tip = 1
for i in eachindex(pagedescs)
pagedesc = pagedescs[i]
# when nbits == 1 for bits, need RoundUp
uncomp_size = div(pagedesc.num_elements * nbits, 8, RoundUp)
dst = @view res[tip:tip+uncomp_size-1]
_read_locator!(tmp, io, pagedesc.locator, uncomp_size)
if !split
dst .= tmp
elseif split
if nbits == 16
split2_reinterpret!(dst, tmp)
elseif nbits == 32
split4_reinterpret!(dst, tmp)
elseif nbits == 64
split8_reinterpret!(dst, tmp)
end
end
tip += uncomp_size
end

return res
end

# TODO: handle flags for shared cluster
@SimpleStruct struct ClusterSummary
first_entry_number::Int64
Expand Down

0 comments on commit 63c64cf

Please sign in to comment.