diff --git a/src/bam/bai.jl b/src/bam/bai.jl index a017ca92..6237979f 100644 --- a/src/bam/bai.jl +++ b/src/bam/bai.jl @@ -33,6 +33,11 @@ function BAI(input::IO) return read_bai(input) end +function findbai(filepath::AbstractString) + baipath = string(filepath, ".bai") + return isfile(baipath) ? Nullable(BAI(baipath)) : Nullable{BAI}() +end + # Read a BAI object from `input`. function read_bai(input::IO) # check magic bytes diff --git a/src/bam/overlap.jl b/src/bam/overlap.jl index 378fc14b..54904e86 100644 --- a/src/bam/overlap.jl +++ b/src/bam/overlap.jl @@ -31,7 +31,10 @@ end # Iterator # -------- -mutable struct OverlapIteratorState +mutable struct OverlapIteratorState{S} + # reader's state + readerstate::ReaderState{S,Record} + # reference index refindex::Int @@ -40,30 +43,31 @@ mutable struct OverlapIteratorState # current chunk index chunkid::Int - - # pre-allocated record - record::Record end function Base.start(iter::OverlapIterator) - refindex = findfirst(iter.reader.refseqnames, iter.refname) + readerstate = ReaderState(iter.reader) + reader = readerstate.reader + refindex = findfirst(reader.refseqnames, iter.refname) if refindex == 0 throw(ArgumentError("sequence name $(iter.refname) is not found in the header")) end - @assert !isnull(iter.reader.index) - chunks = GenomicFeatures.Indexes.overlapchunks(get(iter.reader.index).index, refindex, iter.interval) + @assert !isnull(reader.index) + chunks = GenomicFeatures.Indexes.overlapchunks(get(reader.index).index, refindex, iter.interval) if !isempty(chunks) - seek(iter.reader, first(chunks).start) + seek(reader.input, first(chunks).start) end - return OverlapIteratorState(refindex, chunks, 1, Record()) + return OverlapIteratorState(readerstate, refindex, chunks, 1) end function Base.done(iter::OverlapIterator, state) + reader = state.readerstate.reader + record = state.readerstate.record while state.chunkid ≤ endof(state.chunks) chunk = state.chunks[state.chunkid] - while BGZFStreams.virtualoffset(iter.reader.stream) < chunk.stop - read!(iter.reader, state.record) - c = compare_intervals(state.record, (state.refindex, iter.interval)) + while BGZFStreams.virtualoffset(reader.input) < chunk.stop + read!(reader, record) + c = compare_intervals(record, (state.refindex, iter.interval)) if c == 0 # overlapping return false @@ -74,14 +78,14 @@ function Base.done(iter::OverlapIterator, state) end state.chunkid += 1 if state.chunkid ≤ endof(state.chunks) - seek(iter.reader, state.chunks[state.chunkid].start) + seek(reader.input, state.chunks[state.chunkid].start) end end return true end function Base.next(::OverlapIterator, state) - return copy(state.record), state + return copy(state.readerstate.record), state end function compare_intervals(record::Record, interval::Tuple{Int,UnitRange{Int}}) diff --git a/src/bam/reader.jl b/src/bam/reader.jl index e7b08f47..bfc39127 100644 --- a/src/bam/reader.jl +++ b/src/bam/reader.jl @@ -2,47 +2,100 @@ # ========== """ - BAM.Reader(input::IO; index=nothing) + BAM.Reader(input; index=nothing) Create a data reader of the BAM file format. # Arguments -* `input`: data source -* `index=nothing`: filepath to a random access index (currently *bai* is Supported) +* `input`: data source (filepath or readable `IO` object) +* `index=nothing`: filepath to a random access index (currently *bai* is supported) """ -mutable struct Reader{T} <: Bio.IO.AbstractReader - stream::BGZFStreams.BGZFStream{T} +mutable struct Reader{T<:Union{String,BGZFStreams.BGZFStream}} <: Bio.IO.AbstractReader + # data source + input::T + + # BAM index + index::Nullable{BAI} + + # header data header::SAM.Header - start_offset::BGZFStreams.VirtualOffset refseqnames::Vector{String} refseqlens::Vector{Int} - index::Nullable{BAI} end -function Base.eltype{T}(::Type{Reader{T}}) - return Record -end +function Reader(input::BGZFStreams.BGZFStream; index=nothing) + if index == nothing + index = Nullable{BAI}() + elseif index isa BAI + index = Nullable(index) + elseif index isa AbstractString + index = Nullable(BAI(index)) + elseif index isa Nullable{BAI} + # ok + else + error("unrecognizable index argument: $(typeof(index))") + end -function Bio.IO.stream(reader::Reader) - return reader.stream + # magic bytes + B = read(input, UInt8) + A = read(input, UInt8) + M = read(input, UInt8) + x = read(input, UInt8) + if B != UInt8('B') || A != UInt8('A') || M != UInt8('M') || x != 0x01 + error("input was not a valid BAM file") + end + + # SAM header + textlen = read(input, Int32) + samreader = SAM.Reader(IOBuffer(read(input, UInt8, textlen))) + + # reference sequences + refseqnames = String[] + refseqlens = Int[] + n_refs = read(input, Int32) + for _ in 1:n_refs + namelen = read(input, Int32) + data = read(input, UInt8, namelen) + seqname = unsafe_string(pointer(data)) + seqlen = read(input, Int32) + push!(refseqnames, seqname) + push!(refseqlens, seqlen) + end + + return Reader(input, index, samreader.header, refseqnames, refseqlens) end function Reader(input::IO; index=nothing) - if isa(index, AbstractString) - index = BAI(index) - else - if index != nothing - error("unrecognizable index argument") + return Reader(BGZFStreams.BGZFStream(input), index=index) +end + +function Reader(filepath::AbstractString; index=:auto) + if index isa Symbol + if index == :auto + index = findbai(filepath) + else + throw(ArgumentError("invalid index: ':$(index)'")) end + elseif index isa AbstractString + index = BAI(index) end - reader = init_bam_reader(input) - reader.index = index - return reader + return Reader(filepath, index, SAM.Header(), String[], Int[]) +end + +function Base.open(reader::Reader{String}) + return Reader(open(reader.input), index=reader.index) +end + +function Base.eltype{T}(::Type{Reader{T}}) + return Record +end + +function Bio.IO.stream(reader::Reader) + return reader.input end function Base.show(io::IO, reader::Reader) - println(io, summary(reader), ":") - print(io, " number of contigs: ", length(reader.refseqnames)) + print(io, summary(reader), "()") end """ @@ -70,81 +123,36 @@ function Bio.header(reader::Reader) return header(reader) end -function Base.seek(reader::Reader, voffset::BGZFStreams.VirtualOffset) - seek(reader.stream, voffset) +#function Base.seek(reader::Reader, voffset::BGZFStreams.VirtualOffset) +# seek(reader.stream, voffset) +#end +# +#function Base.seekstart(reader::Reader) +# seek(reader.stream, reader.start_offset) +#end + +struct ReaderState{S,T} + reader::S + record::T end -function Base.seekstart(reader::Reader) - seek(reader.stream, reader.start_offset) +function ReaderState(reader::Reader{<:BGZFStreams.BGZFStream}) + return ReaderState(reader, Record()) end -function Base.start(reader::Reader) - return Record() +function ReaderState(reader::Reader{String}) + return ReaderState(open(reader), Record()) end -function Base.done(reader::Reader, rec) - return eof(reader) +function Base.start(reader::Reader) + return ReaderState(reader) end -function Base.next(reader::Reader, rec) - read!(reader, rec) - return copy(rec), rec +function Base.done(::Reader, state) + return eof(state.reader.input) end -# Initialize a BAM reader by reading the header section. -function init_bam_reader(input::BGZFStreams.BGZFStream) - # magic bytes - B = read(input, UInt8) - A = read(input, UInt8) - M = read(input, UInt8) - x = read(input, UInt8) - if B != UInt8('B') || A != UInt8('A') || M != UInt8('M') || x != 0x01 - error("input was not a valid BAM file") - end - - # SAM header - textlen = read(input, Int32) - samreader = SAM.Reader(IOBuffer(read(input, UInt8, textlen))) - - # reference sequences - refseqnames = String[] - refseqlens = Int[] - n_refs = read(input, Int32) - for _ in 1:n_refs - namelen = read(input, Int32) - data = read(input, UInt8, namelen) - seqname = unsafe_string(pointer(data)) - seqlen = read(input, Int32) - push!(refseqnames, seqname) - push!(refseqlens, seqlen) - end - - voffset = isa(input.io, Pipe) ? - BGZFStreams.VirtualOffset(0, 0) : - BGZFStreams.virtualoffset(input) - return Reader( - input, - samreader.header, - voffset, - refseqnames, - refseqlens, - Nullable{BAI}()) -end - -function init_bam_reader(input::IO) - return init_bam_reader(BGZFStreams.BGZFStream(input)) -end - -function _read!(reader::Reader, record) - unsafe_read( - reader.stream, - pointer_from_objref(record), - FIXED_FIELDS_BYTES) - dsize = data_size(record) - if length(record.data) < dsize - resize!(record.data, dsize) - end - unsafe_read(reader.stream, pointer(record.data), dsize) - record.reader = reader - return record +function Base.next(::Reader, state) + read!(state.reader, state.record) + return copy(state.record), state end diff --git a/src/bam/record.jl b/src/bam/record.jl index 965eeb28..69c23324 100644 --- a/src/bam/record.jl +++ b/src/bam/record.jl @@ -86,7 +86,17 @@ function Base.show(io::IO, record::Record) end function Base.read!(reader::Reader, record::Record) - return _read!(reader, record) + unsafe_read( + reader.input, + pointer_from_objref(record), + FIXED_FIELDS_BYTES) + dsize = data_size(record) + if length(record.data) < dsize + resize!(record.data, dsize) + end + unsafe_read(reader.input, pointer(record.data), dsize) + record.reader = reader + return record end diff --git a/transcript_depth.jl b/transcript_depth.jl new file mode 100644 index 00000000..fc2a55b6 --- /dev/null +++ b/transcript_depth.jl @@ -0,0 +1,68 @@ +@everywhere begin + using BioAlignments + using GenomicFeatures + + # The main algorithm. + function compute_depth(reader, interval) + range = interval.first:interval.last + depth = zeros(Int, length(range)) + for record in eachoverlap(reader, interval) + if !BAM.ismapped(record) || !BAM.isprimary(record) + continue + end + aln = BAM.alignment(record) + for i in 1:BAM.seqlength(record) + j, op = seq2ref(aln, i) + if ismatchop(op) && j in range + @inbounds depth[j - first(range) + 1] += 1 + end + end + end + return depth + end +end + +# Sequential computation. +function transcript_depth0(bamfile, intervals) + reader = BAM.Reader(bamfile) + return map(intervals) do interval + return compute_depth(reader, interval) + end +end + +# Parallel computation using pmap (open BAM.Reader inside the closure). +function transcript_depth1(bamfile, intervals, batchsize) + pmap(intervals, batch_size=batchsize) do interval + reader = BAM.Reader(bamfile) + return compute_depth(reader, interval) + end +end + +# Parallel computation using pmap (open BAM.Reader outside the closure). +function transcript_depth2(bamfile, intervals, batchsize) + reader = BAM.Reader(bamfile) + return pmap(intervals, batch_size=batchsize) do interval + return compute_depth(reader, interval) + end +end + +bamfile = expanduser("./data/SRR1238088.sort.bam") +gff3file = expanduser("./data/TAIR10_GFF3_genes.gff") +intervals = collect(Interval, Iterators.filter(r->GFF3.seqid(r)=="Chr1" && GFF3.featuretype(r)=="gene", GFF3.Reader(open(gff3file)))) + +using DocOpt +args = docopt("Usage: main.jl [--batch_size=] ") +batch_size = args["--batch_size"] +if batch_size == nothing + batch_size = 30 +else + batch_size = parse(Int, batch_size) +end +f = eval(parse(args[""])) +func = () -> f == transcript_depth0 ? f(bamfile, intervals) : f(bamfile, intervals, batch_size) + +println(STDERR, sum(map(sum, func()))) +for i in 1:3 + gc() + println(@elapsed func()) +end