forked from nsmith-/TTGamma_LongExercise
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathrunFullDataset.py
executable file
·158 lines (138 loc) · 5.41 KB
/
runFullDataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
import uproot
import datetime
import logging
import hist
from coffea import util, processor
from coffea.nanoevents import NanoAODSchema
from ttgamma import TTGammaProcessor
from ttgamma.utils.fileset2021 import fileset
from ttgamma.utils.crossSections import lumis, crossSections
import time
import sys
import os
import argparse
import warnings
from numba.core.errors import NumbaDeprecationWarning
warnings.filterwarnings("ignore", "Found duplicate branch")
warnings.simplefilter('ignore', category=NumbaDeprecationWarning)
# Define mapping for running on condor
mc_group_mapping = {
"MCTTGamma": [key for key in fileset if "TTGamma" in key],
"MCTTbar1l": ["TTbarPowheg_Semilept", "TTbarPowheg_Hadronic"],
"MCTTbar2l": ["TTbarPowheg_Dilepton"],
"MCSingleTop": [key for key in fileset if "ST" in key],
"MCZJets": [key for key in fileset if "DY" in key],
"MCWJets": [
key
for key in fileset
if "W1" in key or "W2" in key or "W3" in key or "W4" in key
],
}
mc_nonother = {key for group in mc_group_mapping.values() for key in group}
mc_group_mapping["MCOther"] = [
key for key in fileset if (not key in mc_nonother) and (not "Data" in key)
]
mc_group_mapping["MCAll"] = [
key for group in mc_group_mapping.values() for key in group
]
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
level=logging.WARNING,
)
parser = argparse.ArgumentParser(
description="Batch processing script for ttgamma analysis"
)
parser.add_argument(
"mcGroup",
choices=list(mc_group_mapping) + ["Data"],
help="Name of process to run",
)
parser.add_argument("--chunksize", type=int, default=10000, help="Chunk size")
parser.add_argument("--maxchunks", type=int, default=None, help="Max chunks")
parser.add_argument("--workers", type=int, default=1, help="Number of workers")
parser.add_argument("--outdir", type=str, default="Outputs", help="Where to put the output files")
parser.add_argument(
"--batch", action="store_true", help="Batch mode (no progress bar)"
)
parser.add_argument(
"-e",
"--executor",
choices=["local", "lpcjq", "debug"],
default="local",
help="How to run the processing",
)
args = parser.parse_args()
if not os.path.exists(args.outdir):
os.makedirs(args.outdir)
tstart = time.time()
print("Running mcGroup {}".format(args.mcGroup))
if args.executor == "local":
if args.workers > 4:
raise RuntimeError("You probably shouldn't run more than 4 cores locally at LPC")
executor = processor.FuturesExecutor(
workers=args.workers, status=not args.batch
)
elif args.executor == "debug":
executor = processor.IterativeExecutor(status=not args.batch)
elif args.executor == "lpcjq":
from distributed import Client
from lpcjobqueue import LPCCondorCluster
if args.workers == 1:
print("Are you sure you want to use only one worker?")
cluster = LPCCondorCluster(
transfer_input_files="ttgamma",
# memory="4GB",
# log_directory="/uscms/home/ncsmith/dask_logs",
)
cluster.adapt(minimum=1, maximum=args.workers)
executor = processor.DaskExecutor(client=Client(cluster), status=not args.batch)
runner = processor.Runner(
executor=executor,
schema=NanoAODSchema,
chunksize=args.chunksize,
maxchunks=args.maxchunks,
)
if args.mcGroup == "Data":
job_fileset = {key: fileset[key] for key in fileset if "Data" in key}
output = runner(
job_fileset,
treename="Events",
processor_instance=TTGammaProcessor(isMC=False),
)
else:
job_fileset = {key: fileset[key] for key in mc_group_mapping[args.mcGroup]}
output = runner(
job_fileset,
treename="Events",
processor_instance=TTGammaProcessor(isMC=True),
)
# Compute original number of events for normalization
lumi_sfs = {}
for dataset_name, dataset_files in job_fileset.items():
output[dataset_name]["InputEventCount"] = processor.value_accumulator(int)
for filename in dataset_files:
with uproot.open(filename) as fhandle:
output[dataset_name]["InputEventCount"] += (
fhandle["hEvents"].values()[2] - fhandle["hEvents"].values()[0]
)
# Calculate luminosity scale factor
lumi_sf = (
crossSections[dataset_name]
* lumis[2016]
/ output[dataset_name]["InputEventCount"].value
)
for key, obj in output[dataset_name].items():
if isinstance(obj, hist.Hist):
obj *= lumi_sf
elapsed = time.time() - tstart
nevt = sum(h["EventCount"] for h in output.values())
print(f"Total time: {elapsed:.1f} seconds")
print("Total rate: %.1f events / second" % (nevt / elapsed))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
outfile = os.path.join(args.outdir, f"output_{args.mcGroup}_run{timestamp}.coffea")
util.save(output, outfile)
print(f"Saved output to {outfile}")
if args.executor == "lpcjq":
cluster.close()