Skip to content

Commit

Permalink
Added multi-file support. Added Analysis module.
Browse files Browse the repository at this point in the history
  • Loading branch information
peremato committed Mar 19, 2024
1 parent ca4e800 commit 7d97dae
Show file tree
Hide file tree
Showing 9 changed files with 1,105 additions and 680 deletions.
142 changes: 52 additions & 90 deletions examples/FCC/analysis_MT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,127 +2,89 @@ using EDM4hep
using EDM4hep.RootIO
using EDM4hep.SystemOfUnits
using EDM4hep.Histograms
using EDM4hep.Analysis
using DataFrames

include("analysis_functions.jl")

f = "root://eospublic.cern.ch//eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/IDEA/p8_ee_ZZ_ecm240/events_000189367.root"
#f = "/Users/mato/cernbox/Data/events_000189367.root"
reader = RootIO.Reader(f);
fnames = """
events_000189367.root
events_000787350.root
events_001145354.root
events_001680909.root
events_001893485.root
events_002227306.root
events_002498645.root
events_002528960.root
events_002763770.root
events_003579490.root
"""
froot = "root://eospublic.cern.ch//eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/IDEA/p8_ee_ZZ_ecm240"
files = joinpath.(Ref(froot),split(fnames))

files = "root://eospublic.cern.ch//eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/IDEA/p8_ee_ZZ_ecm240/events_000189367.root"
#files = "/Users/mato/cernbox/Data/events_000189367.root"

reader = RootIO.Reader(files);
events = RootIO.get(reader, "events");

const N = Threads.nthreads()
const tasks_per_thread = 4

mutable struct MyData
mutable struct MyData <: AbstractAnalysisData
df::DataFrame
pevts::Int64
sevts::Int64
MyData() = new(DataFrame(Zcand_m = Float32[], Zcand_recoil_m = Float32[], Zcand_q = Int32[]), 0, 0)
end
function Base.empty!(data::MyData)
empty!(data.df)
data.pevts = 0
data.sevts = 0
end
function Base.append!(d1::MyData, d2::MyData)
append!(d1.df, d2.df)
d1.pevts += d2.pevts
d1.sevts += d2.sevts
end

function myiteration!(data::MyData, reader, evt)
data.pevts += 1
μIDs = RootIO.get(reader, evt, "Muon#0")
length(μIDs) < 2 && return

recps = RootIO.get(reader, evt, "ReconstructedParticles")
muons = recps[μIDs]

sel_muons = filter(x -> pₜ(x) > 10GeV, muons)
zed_leptonic = resonanceBuilder(91GeV, sel_muons)
zed_leptonic_recoil = recoilBuilder(240GeV, zed_leptonic)
if length(zed_leptonic) == 1 # Filter to have exactly one Z candidate
Zcand_m = zed_leptonic[1].mass
Zcand_recoil_m = zed_leptonic_recoil[1].mass
Zcand_q = zed_leptonic[1].charge
if 80GeV <= Zcand_m <= 100GeV
push!(data.df, (Zcand_m, Zcand_recoil_m, Zcand_q))
data.sevts += 1
end
end
end

function myanalysis!(data::MyData, reader, events)
for evt in events
myiteration!(data, reader, evt)
end
return data
end


function do_analysis_mt!(data, afunc, reader, events)
N = Threads.nthreads()
# Empty the data
empty!(data)

# Chunk the total number of events to process
chunks = Iterators.partition(events, length(events) ÷ (tasks_per_thread * N))
# Spawn the tasks
tasks = map(chunks) do chunk
Threads.@spawn afunc(MyData(), reader, chunk)
end
# Wait and sum the reduce the results
results = fetch.(tasks)
append!.(Ref(data), results)
return data
end

function do_analysis_serial!(data, afunc, reader, events)
# Empty the data
empty!(data)
afunc(data, reader, events)
return data
end

function do_analysis_threads!(data, afunc, reader, events)
N = Threads.nthreads()
# Empty the data
empty!(data)
vdata = [deepcopy(data) for i in 1:N]
Threads.@threads for evt in events
tid = Threads.threadid()
afunc(vdata[tid], reader, evt)
end
for i in 1:N
append!(data, vdata[i])
data.pevts += 1 # count process events
μIDs = RootIO.get(reader, evt, "Muon#0") # get the ids of muons
length(μIDs) < 2 && continue # skip if less than 2

recps = RootIO.get(reader, evt, "ReconstructedParticles")
muons = recps[μIDs] # use the ids to subset the reco particles

sel_muons = filter(x -> pₜ(x) > 10GeV, muons) # select the the Pt of muons
zed_leptonic = resonanceBuilder(91GeV, sel_muons)
zed_leptonic_recoil = recoilBuilder(240GeV, zed_leptonic)
if length(zed_leptonic) == 1 # filter to have exactly one Z candidate
Zcand_m = zed_leptonic[1].mass
Zcand_recoil_m = zed_leptonic_recoil[1].mass
Zcand_q = zed_leptonic[1].charge
if 80GeV <= Zcand_m <= 100GeV # select on mass of Z candidate, push to dataframe
push!(data.df, (Zcand_m, Zcand_recoil_m, Zcand_q))
data.sevts += 1 # count selected events
end
end
end
return data
end

mydata = MyData()
#subset = @view events[1:10000]
subset = events

@info "Serial 1st run"
@time do_analysis_serial!(mydata, myanalysis!, reader, events);
@time do_analysis!(mydata, myanalysis!, reader, subset);
@info "Serial 2nd run"
@time do_analysis_serial!(mydata, myanalysis!, reader, events);
@time do_analysis!(mydata, myanalysis!, reader, subset);
println("Processed events: $(mydata.pevts), selected: $(mydata.sevts)")
mydata.df |> describe |> println

@info "Chunk 1st run"
@time do_analysis_mt!(mydata, myanalysis!, reader, events);
@info "MT 1st run"
@time do_analysis!(mydata, myanalysis!, reader, subset; mt=true);
#Profile.clear_malloc_data()
@info "Chunk 2nd run"
@time do_analysis_mt!(mydata, myanalysis!, reader, events);

@info "Threads 1st run"
@time do_analysis_threads!(mydata, myiteration!, reader, events);
@info "Threads 2nd run"
@time do_analysis_threads!(mydata, myiteration!, reader, events);
@info "MT 2nd run"
@time do_analysis!(mydata, myanalysis!, reader, subset);
println("Processed events: $(mydata.pevts), selected: $(mydata.sevts)")
mydata.df |> describe |> println

#using Parquet2
#Parquet2.writefile("m_H-recoil.parquet", mydata.df)

#using Plots
#histogram(sum_df.Zcand_m)



1,009 changes: 488 additions & 521 deletions examples/FCC/analysis_mH-recoil-MT.ipynb

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions podio/genStructArrays.jl → podio/genStructArrays-v16.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function StructArray{SimTrackerHit, bname}(evt::UnROOT.LazyEvent, collid = UInt3
getproperty(evt, Symbol(bname, :_quality)),
StructArray{Vector3d, Symbol(bname, :_position)}(evt, collid, len),
StructArray{Vector3f, Symbol(bname, :_momentum)}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_MCParticle") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#0")}(evt, collid, len),
)
return StructArray{SimTrackerHit}(columns)
end
Expand Down Expand Up @@ -65,7 +65,7 @@ function StructArray{Vertex, bname}(evt::UnROOT.LazyEvent, collid = UInt32(0), l
StructArray{SVector{6,Float32}}(reshape(getproperty(evt, Symbol(bname, "_covMatrix[6]")), 6, len);dims=1),
getproperty(evt, Symbol(bname, :_algorithmType)),
StructArray{PVector{Vertex,Float32,1}, Symbol(bname, :_parameters)}(evt, collid, len),
StructArray{ObjectID{POD}, isnewpodio() ? Symbol(:_, bname, "_associatedParticle") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{POD}, Symbol(bname, "#0")}(evt, collid, len),
)
return StructArray{Vertex}(columns)
end
Expand Down Expand Up @@ -127,8 +127,8 @@ function StructArray{MCRecoTrackParticleAssociation, bname}(evt::UnROOT.LazyEven
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoTrackParticleAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{Track}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{Track}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoTrackParticleAssociation}(columns)
end
Expand All @@ -142,7 +142,7 @@ function StructArray{TrackerPulse, bname}(evt::UnROOT.LazyEvent, collid = UInt32
getproperty(evt, Symbol(bname, :_charge)),
getproperty(evt, Symbol(bname, :_quality)),
StructArray{SVector{3,Float32}}(reshape(getproperty(evt, Symbol(bname, "_covMatrix[3]")), 3, len);dims=1),
StructArray{ObjectID{TimeSeries}, isnewpodio() ? Symbol(:_, bname, "_timeSeries") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{TimeSeries}, Symbol(bname, "#0")}(evt, collid, len),
)
return StructArray{TrackerPulse}(columns)
end
Expand All @@ -152,8 +152,8 @@ function StructArray{MCRecoParticleAssociation, bname}(evt::UnROOT.LazyEvent, co
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoParticleAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{ReconstructedParticle}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{ReconstructedParticle}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoParticleAssociation}(columns)
end
Expand All @@ -163,8 +163,8 @@ function StructArray{MCRecoCaloAssociation, bname}(evt::UnROOT.LazyEvent, collid
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoCaloAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{CalorimeterHit}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{SimCalorimeterHit}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{CalorimeterHit}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{SimCalorimeterHit}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoCaloAssociation}(columns)
end
Expand All @@ -191,7 +191,7 @@ function StructArray{CaloHitContribution, bname}(evt::UnROOT.LazyEvent, collid =
getproperty(evt, Symbol(bname, :_energy)),
getproperty(evt, Symbol(bname, :_time)),
StructArray{Vector3f, Symbol(bname, :_stepPosition)}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_particle") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#0")}(evt, collid, len),
)
return StructArray{CaloHitContribution}(columns)
end
Expand All @@ -201,8 +201,8 @@ function StructArray{MCRecoTrackerHitPlaneAssociation, bname}(evt::UnROOT.LazyEv
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoTrackerHitPlaneAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{TrackerHitPlane}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{SimTrackerHit}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{TrackerHitPlane}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{SimTrackerHit}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoTrackerHitPlaneAssociation}(columns)
end
Expand All @@ -212,8 +212,8 @@ function StructArray{MCRecoCaloParticleAssociation, bname}(evt::UnROOT.LazyEvent
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoCaloParticleAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{CalorimeterHit}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{CalorimeterHit}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoCaloParticleAssociation}(columns)
end
Expand Down Expand Up @@ -256,8 +256,8 @@ function StructArray{ReconstructedParticle, bname}(evt::UnROOT.LazyEvent, collid
StructArray{Relation{ReconstructedParticle,Track,2}, Symbol(bname, :_tracks)}(evt, collid, len),
StructArray{Relation{ReconstructedParticle,ReconstructedParticle,3}, Symbol(bname, :_particles)}(evt, collid, len),
StructArray{Relation{ReconstructedParticle,ParticleID,4}, Symbol(bname, :_particleIDs)}(evt, collid, len),
StructArray{ObjectID{Vertex}, isnewpodio() ? Symbol(:_, bname, "_startVertex") : Symbol(bname, "#4")}(evt, collid, len),
StructArray{ObjectID{ParticleID}, isnewpodio() ? Symbol(:_, bname, "_particleIDUsed") : Symbol(bname, "#5")}(evt, collid, len),
StructArray{ObjectID{Vertex}, Symbol(bname, "#4")}(evt, collid, len),
StructArray{ObjectID{ParticleID}, Symbol(bname, "#5")}(evt, collid, len),
)
return StructArray{ReconstructedParticle}(columns)
end
Expand All @@ -275,7 +275,7 @@ function StructArray{SimPrimaryIonizationCluster, bname}(evt::UnROOT.LazyEvent,
StructArray{PVector{SimPrimaryIonizationCluster,Vector3d,3}, Symbol(bname, :_electronPosition)}(evt, collid, len),
StructArray{PVector{SimPrimaryIonizationCluster,Float32,4}, Symbol(bname, :_pulseTime)}(evt, collid, len),
StructArray{PVector{SimPrimaryIonizationCluster,Float32,5}, Symbol(bname, :_pulseAmplitude)}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_MCParticle") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#0")}(evt, collid, len),
)
return StructArray{SimPrimaryIonizationCluster}(columns)
end
Expand Down Expand Up @@ -318,8 +318,8 @@ function StructArray{RecoParticleVertexAssociation, bname}(evt::UnROOT.LazyEvent
len = length(firstmem)
columns = (StructArray{ObjectID{RecoParticleVertexAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{ReconstructedParticle}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{Vertex}, isnewpodio() ? Symbol(:_, bname, "_vertex") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{ReconstructedParticle}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{Vertex}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{RecoParticleVertexAssociation}(columns)
end
Expand All @@ -333,7 +333,7 @@ function StructArray{RecDqdx, bname}(evt::UnROOT.LazyEvent, collid = UInt32(0),
getproperty(evt, Symbol(bname, :_type)),
StructArray{SVector{5,Hypothesis}}(reshape(getproperty(evt, Symbol(bname, "_hypotheses[5]")), 5, len);dims=1),
StructArray{PVector{RecDqdx,HitLevelData,1}, Symbol(bname, :_hitData)}(evt, collid, len),
StructArray{ObjectID{Track}, isnewpodio() ? Symbol(:_, bname, "_track") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{Track}, Symbol(bname, "#0")}(evt, collid, len),
)
return StructArray{RecDqdx}(columns)
end
Expand Down Expand Up @@ -369,8 +369,8 @@ function StructArray{MCRecoTrackerAssociation, bname}(evt::UnROOT.LazyEvent, col
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoTrackerAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{TrackerHit}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{SimTrackerHit}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{TrackerHit}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{SimTrackerHit}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoTrackerAssociation}(columns)
end
Expand All @@ -393,8 +393,8 @@ function StructArray{MCRecoClusterParticleAssociation, bname}(evt::UnROOT.LazyEv
len = length(firstmem)
columns = (StructArray{ObjectID{MCRecoClusterParticleAssociation}}((collect(0:len-1),fill(collid,len))),
firstmem,
StructArray{ObjectID{Cluster}, isnewpodio() ? Symbol(:_, bname, "_rec") : Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, isnewpodio() ? Symbol(:_, bname, "_sim") : Symbol(bname, "#1")}(evt, collid, len),
StructArray{ObjectID{Cluster}, Symbol(bname, "#0")}(evt, collid, len),
StructArray{ObjectID{MCParticle}, Symbol(bname, "#1")}(evt, collid, len),
)
return StructArray{MCRecoClusterParticleAssociation}(columns)
end
Expand Down
Loading

0 comments on commit 7d97dae

Please sign in to comment.