Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LowCardinality and Nullable #14

Merged
merged 5 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/Net.jl
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,31 @@ function read_col(sock::ClickHouseSock, num_rows::VarUInt)::Column
name = chread(sock, String)
type_name = chread(sock, String)

data = read_col_data(sock, num_rows, parse_typestring(type_name))
data = try
read_col_data(sock, num_rows, parse_typestring(type_name))
catch e
if e isa ArgumentError
error("Error while reading col $(name) ($(type)): $(e.msg)")
else
rethrow(e)
end
end
Column(name, type_name, data)
end

function chwrite(sock::ClickHouseSock, x::Column)
chwrite(sock, x.name)
chwrite(sock, x.type)

write_col_data(sock, x.data, parse_typestring(x.type))
try
write_col_data(sock, x.data, parse_typestring(x.type))
catch e
if e isa ArgumentError
error("Error while writing col $(x.name) ($(x.type)): $(e.msg)")
else
rethrow(e)
end
end
end

struct Block
Expand Down
7 changes: 7 additions & 0 deletions src/columns/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ macro _primitive_columns(args...)
return chwrite(sock, data)
end
end )
push!(funcs, quote
function write_col_data(sock::ClickHouseSock,
data::AbstractVector,
::Val{Symbol($arg_string)})
return chwrite(sock, convert(Vector{$arg},data))
end
end )
push!(funcs, quote deserialize(::Val{Symbol($arg_string)}) = $arg end )
end
return esc(:($(funcs...),))
Expand Down
3 changes: 3 additions & 0 deletions src/columns/Interfaces.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ is_ch_type(::Val{N}) where {N} = false
is_ch_type(str::String) = is_ch_type(Val(Symbol(str)))
is_ch_type(s::Symbol) = is_ch_type(Val(s))

can_be_nullable(::Val{N}) where {N} = true
can_be_nullable(s::Symbol) = can_be_nullable(Val(s))

function read_col_data(sock::ClickHouseSock,
num_rows::VarUInt, ::Val{N}, args...) where {N}
throw(
Expand Down
100 changes: 100 additions & 0 deletions src/columns/LowCardinality.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using UUIDs
is_ch_type(::Val{:LowCardinality}) = true
can_be_nullable(::Val{:LowCardinality}) = false

# Need to read additional keys.
# Additional keys are stored before indexes as value N and N keys
# after them.
const lc_has_additional_keys_bit = 1 << 9
# Need to update dictionary.
# It means that previous granule has different dictionary.
const lc_need_update_dictionary = 1 << 10

const lc_serialization_type = lc_has_additional_keys_bit | lc_need_update_dictionary

const lc_index_int_types = [:UInt8, :UInt16, :UInt32, :UInt64]


function make_result(index::Vector{T}, keys, is_nullable) where {T}

result = is_nullable ?
CategoricalVector{Union{T, Missing}}(undef, 0, levels = index) :
CategoricalVector{T}(undef, 0, levels = index)
result.refs = keys
return result
end

function make_result(index::CategoricalVector{T}, keys, is_nullable) where {T}

result = is_nullable ?
CategoricalVector{Union{T, Missing}}(undef, 0, levels = get.(index)) :
CategoricalVector{T}(undef, 0, levels = get.(index))
result.refs = keys
return result
end


function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:LowCardinality}, nested::TypeAst)

UInt64(num_rows) == 0 && return read_col_data(sock, num_rows, nested)

is_nested_nullable = (nested.name == :Nullable)
notnullable_nested = is_nested_nullable ? nested.args[1] : nested

ver = chread(sock, UInt64) # KeysSerializationVersion
ver == 1 || error("unsupported LC serialization version: $(ver)")

serialization_type = chread(sock, UInt64)
int_type = serialization_type & 0xf

index_size = chread(sock, UInt64)
index = read_col_data(sock, VarUInt(index_size), notnullable_nested)
is_nested_nullable && (index = index[2:end])

keys_size = chread(sock, UInt64)
keys = read_col_data(sock, VarUInt(keys_size), Val(lc_index_int_types[int_type + 1]))

(nested.name != :Nullable) && (keys .= keys .+ 1)


return make_result(index, keys, nested.name == :Nullable)
end


function write_col_data(sock::ClickHouseSock,
data::AbstractCategoricalVector{T},
::Val{:LowCardinality}, nested::TypeAst) where {T}

is_nested_nullable = (nested.name == :Nullable)
notnullable_nested = is_nested_nullable ? nested.args[1] : nested

# KeysSerializationVersion. See ClickHouse docs.
chwrite(sock, Int64(1))
isempty(data) && return

int_type = floor(Int, log2(length(levels(data))) / 2)

serialization_type = lc_serialization_type | int_type
chwrite(sock, serialization_type)

index = is_nested_nullable ?
vcat(missing_replacement(T), levels(data)) :
levels(data)

chwrite(sock, length(index))
write_col_data(sock, index, notnullable_nested)

chwrite(sock, length(data))

#In c++ indexes started from 0, in case of nullable nested 0 means null and
# it's ok, but if nested not nullable we must sub 1 from index
keys = is_nested_nullable ? data.refs : data.refs .- 1
write_col_data(sock, keys, Val(lc_index_int_types[int_type + 1]))
end

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{T},
v::Val{:LowCardinality}, nested::TypeAst) where {T}
write_col_data(sock, CategoricalVector{T}(data), v, nested)
end
80 changes: 80 additions & 0 deletions src/columns/Nullable.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using UUIDs
is_ch_type(::Val{:Nullable}) = true
can_be_nullable(::Val{:Nullable}) = false

convert_to_missings(data::Vector{T}) where {T} =
convert(Vector{Union{T, Missing}}, data)

convert_to_missings(data::CategoricalVector{T}) where {T} =
convert(CategoricalVector{Union{T, Missing}}, data)

function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:Nullable}, nested::TypeAst)

missing_map = chread(sock, Vector{UInt8}, num_rows)
unmissing = read_col_data(sock, num_rows, nested)
result = convert_to_missings(unmissing)
for i in 1:length(missing_map)
(missing_map[i] == 0x1) && (result[i] = missing)
end
return result
end

missing_replacement(::Type{T}) where {T <: Number} = zero(T)
missing_replacement(::Type{UUID}) = UUID(0)
missing_replacement(::Type{Date}) = Date(1970)
missing_replacement(::Type{DateTime}) = unix2datetime(0)
missing_replacement(::Type{String}) = ""
missing_replacement(::Type{Union{T, Missing}}) where {T} =
missing_replacement(T)


uint8_ismissing(v)::UInt8 = ismissing(v) ? 1 : 0

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{Union{Missing, T}},
::Val{:Nullable}, nested::TypeAst) where {T}
!can_be_nullable(nested.name) &&
error("$(nested.name) cannot be inside Nullable")
missing_map = uint8_ismissing.(data)
chwrite(sock, missing_map)
unmissing = if !any(x -> x > 0, missing_map)
convert(Vector{T}, data)
else
replacement = missing_replacement(T)
[ismissing(v) ? replacement : v for v in data]
end

write_col_data(sock, unmissing, nested)
end

function write_col_data(sock::ClickHouseSock,
data::AbstractVector{T},
::Val{:Nullable}, nested::TypeAst) where {T}
!can_be_nullable(nested.name) &&
error("$(nested.name) cannot be inside Nullable")

missing_map = fill(Int8(0), 1:length(data))
chwrite(sock, missing_map)
write_col_data(sock, data, nested)
end

function write_col_data(sock::ClickHouseSock,
data::AbstractCategoricalVector{Union{Missing, T}},
::Val{:Nullable}, nested::TypeAst) where {T}
!can_be_nullable(nested.name) &&
error("$(nested.name) cannot be inside Nullable")
missing_map = uint8_ismissing.(data)
chwrite(sock, missing_map)
unmissing = if !any(x -> x > 0, missing_map)
convert(CategoricalVector{T}, data)
else
tmp = deepcopy(data)
#replace missing (it's always 0 in refs of CategorialVector)
#with something valid
replace!(tmp.refs, 0=>1)
athre0z marked this conversation as resolved.
Show resolved Hide resolved
convert(CategoricalVector{T}, tmp)
end

write_col_data(sock, unmissing, nested)
end
1 change: 1 addition & 0 deletions src/columns/Tuple.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
is_ch_type(::Val{:Tuple}) = true
can_be_nullable(::Val{Tuple}) = false

function read_col_data(sock::ClickHouseSock, num_rows::VarUInt,
::Val{:Tuple}, args::TypeAst...)
Expand Down
2 changes: 2 additions & 0 deletions src/columns/columns.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ include("Enum.jl")
include("FixedString.jl")
include("Tuple.jl")
include("UUID.jl")
include("Nullable.jl")
include("LowCardinality.jl")
Loading