Skip to content

Commit

Permalink
Add a FCC example including MT support
Browse files Browse the repository at this point in the history
  • Loading branch information
peremato authored Mar 18, 2024
1 parent 66d1159 commit ca4e800
Show file tree
Hide file tree
Showing 13 changed files with 6,782 additions and 119 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ FHist = "0.10"
Graphs = "1"
StaticArrays = "1.9"
StructArrays = "0.6"
UnROOT = "=0.10.22"
UnROOT = "v0.10"
YAML = "0.4"
julia = "1"

Expand Down
86 changes: 39 additions & 47 deletions examples/FCC/analysis_MT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ using EDM4hep.RootIO
using EDM4hep.SystemOfUnits
using EDM4hep.Histograms
using DataFrames
using Base.Threads
using Profile

struct _ObjectID{ED <: EDM4hep.POD} <: EDM4hep.POD
index::Int32
collectionID::UInt32 # in some cases (reading from files) the collection ID is -2
end

include("analysis_functions.jl")

#f = "root://eospublic.cern.ch//eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/IDEA/p8_ee_ZZ_ecm240/events_000189367.root"
f = "/Users/mato/cernbox/Data/events_000189367.root"
f = "root://eospublic.cern.ch//eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/IDEA/p8_ee_ZZ_ecm240/events_000189367.root"
#f = "/Users/mato/cernbox/Data/events_000189367.root"
reader = RootIO.Reader(f);
events = RootIO.get(reader, "events");

const N = nthreads()
const tasks_per_thread = 16
const N = Threads.nthreads()
const tasks_per_thread = 4

mutable struct MyData
df::DataFrame
Expand All @@ -38,11 +31,14 @@ function Base.append!(d1::MyData, d2::MyData)
d1.sevts += d2.sevts
end

function myiteration!(data::MyData, evt)
function myiteration!(data::MyData, reader, evt)
data.pevts += 1
recps = RootIO.get(reader, evt, "ReconstructedParticles", register=false);
_muons = RootIO.get(reader, evt, "Muon#0"; btype=_ObjectID{ReconstructedParticle}, register=false)
muons = [recps[mid.index+1] for mid in _muons]
μIDs = RootIO.get(reader, evt, "Muon#0")
length(μIDs) < 2 && return

recps = RootIO.get(reader, evt, "ReconstructedParticles")
muons = recps[μIDs]

sel_muons = filter(x -> pₜ(x) > 10GeV, muons)
zed_leptonic = resonanceBuilder(91GeV, sel_muons)
zed_leptonic_recoil = recoilBuilder(240GeV, zed_leptonic)
Expand All @@ -57,52 +53,46 @@ function myiteration!(data::MyData, evt)
end
end

function myanalysis!(data::MyData, events)
function myanalysis!(data::MyData, reader, events)
for evt in events
myiteration!(data, evt)
myiteration!(data, reader, evt)
end
return data
end


function do_analysis_mt!(data, afunc, events)
function do_analysis_mt!(data, afunc, reader, events)
N = Threads.nthreads()
# Empty the data
empty!(data)
vdata = [deepcopy(data) for i in 1:N]
function do_chunk(func, chunk)
tid = Threads.threadid()
func(vdata[tid], chunk)
nothing
end

# Chunk the total number of events to process
chunks = Iterators.partition(events, length(events) ÷ (tasks_per_thread * N))
# Spawn the tasks
tasks = map(chunks) do chunk
Threads.@spawn do_chunk(afunc, chunk)
Threads.@spawn afunc(MyData(), reader, chunk)
end
# Wait and sum the reduce the results
wait.(tasks)
for i in 1:N
append!(data, vdata[i])
end
results = fetch.(tasks)
append!.(Ref(data), results)
return data
end

function do_analysis_serial!(data, afunc, events)
function do_analysis_serial!(data, afunc, reader, events)
# Empty the data
empty!(data)
afunc(data, events)
afunc(data, reader, events)
return data
end

function do_analysis_threads!(data, afunc, events)
function do_analysis_threads!(data, afunc, reader, events)
N = Threads.nthreads()
# Empty the data
empty!(data)
vdata = [deepcopy(data) for i in 1:N]
@threads for evt in events
Threads.@threads for evt in events
tid = Threads.threadid()
afunc(vdata[tid], evt)
afunc(vdata[tid], reader, evt)
end
for i in 1:N
append!(data, vdata[i])
Expand All @@ -112,19 +102,21 @@ end

mydata = MyData()

do_analysis_mt!(mydata, myanalysis!, Iterators.take(events, 1000))

elapsed = @elapsed do_analysis_serial!(mydata, myanalysis!, events)
println("Serial total time: $elapsed, $(mydata.pevts/elapsed) events/s\nSelected events: $(mydata.sevts)")

elapsed = @elapsed do_analysis_mt!(mydata, myanalysis!, events)
println("MT[$N](tasking) total time: $elapsed, $(mydata.pevts/elapsed) events/s\nSelected events: $(mydata.sevts)")

elapsed = @elapsed do_analysis_threads!(mydata, myiteration!, events)
println("MT[$N](threads) total time: $elapsed, $(mydata.pevts/elapsed) events/s\nSelected events: $(mydata.sevts)")

#@profview do_analysis_mt!(mydata, myanalysis!, events)
#@profview do_analysis_threads!(mydata, myiteration!, events)
@info "Serial 1st run"
@time do_analysis_serial!(mydata, myanalysis!, reader, events);
@info "Serial 2nd run"
@time do_analysis_serial!(mydata, myanalysis!, reader, events);

@info "Chunk 1st run"
@time do_analysis_mt!(mydata, myanalysis!, reader, events);
#Profile.clear_malloc_data()
@info "Chunk 2nd run"
@time do_analysis_mt!(mydata, myanalysis!, reader, events);

@info "Threads 1st run"
@time do_analysis_threads!(mydata, myiteration!, reader, events);
@info "Threads 2nd run"
@time do_analysis_threads!(mydata, myiteration!, reader, events);

#using Parquet2
#Parquet2.writefile("m_H-recoil.parquet", mydata.df)
Expand Down
Loading

0 comments on commit ca4e800

Please sign in to comment.