-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
175 lines (162 loc) · 6.79 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import sys
import time
import yaml
from testplans.testplan import Testplan
from lib.boto3sdk import upload
from lib.Logger import Logger
def generate():
# 1. Read yaml and check validity
global_conf_file = open("config/config.yaml", encoding="UTF-8")
global_conf = yaml.load(global_conf_file, Loader=yaml.FullLoader)
try:
if global_conf['engine'] not in ['spark-sql', 'kylin', 'presto', 'custom']:
logger.error("Failed: invalid engine type")
return
if global_conf['workload'] not in ['tpc-h', 'custom']:
logger.error("Failed: invalid workload type")
return
if global_conf['test_plan'] not in ['one-pass', 'one-pass-concurrent', 'one-offline-multi-online', 'custom']:
logger.error("Failed: invalid test plan type")
return
except KeyError:
logger.error("Failed: incomplete key-value pairs")
return
# 2. Generate the workload
logger.info("Generating workload...")
if global_conf['workload'] == 'tpc-h':
conf_file = open("config/workloads/tpch.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from workloads.tpch import tpch
workload = tpch()
workload.set_switch(conf['switch'])
workload.set_conf(conf['config'])
if conf['switch']['generate'] is True:
workload.generate()
logger.info("Generating data successful!")
else:
logger.info("Generating data skipped!")
def run():
start = time.time()
global_conf_file = open("config/config.yaml", encoding="UTF-8")
global_conf = yaml.load(global_conf_file, Loader=yaml.FullLoader)
# 3. Launch the engine
logger.info("Launching the engine...")
if global_conf['engine'] == 'spark-sql':
conf_file = open("config/engines/spark-sql.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from engines.sparksql import sparksql
engine = sparksql()
engine.set_app_name(conf['name'])
elif global_conf['engine'] == 'kylin':
conf_file = open("config/engines/kylin.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from engines.kylin import kylin
engine = kylin()
elif global_conf['engine'] == 'presto':
conf_file = open("config/engines/spark-sql.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from engines.presto import presto
engine = presto()
else:
from engines.engine import engine
engine = engine()
try:
engine.set_conf(conf['config'])
except KeyError:
engine.set_conf({})
engine.launch()
logger.info("Engine launched successful!")
logger.info("--------------------------------")
# 4. Generate warehouse
logger.info("Generating warehouse...")
if global_conf['workload'] == 'tpc-h':
conf_file = open("config/workloads/tpch.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from workloads.tpch import tpch
workload = tpch()
workload.set_switch(conf['switch'])
workload.set_conf(conf['config'])
if conf['switch']['create'] is True:
workload.create()
logger.info("Creating data tables successful!")
else:
logger.info("Creating data tables skipped!")
if conf['switch']['load'] is True:
workload.load()
logger.info("Loading data to warehouse successful!")
else:
logger.info("Loading data to warehouse skipped!")
workload.set_query(conf['query'])
else:
from workloads.workload import workload
workload = workload()
logger.info("Workload generated!")
logger.info("--------------------------------")
# 5. Generate the execution plan
logger.info("Generating execution plan...")
plan = Testplan()
if global_conf['test_plan'] == 'one-pass':
conf_file = open("config/testplans/one-pass.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from testplans.onepass import One_pass_testplan
plan = One_pass_testplan()
plan.build(conf)
if global_conf['test_plan'] == 'one-pass-concurrent':
conf_file = open("config/testplans/one-pass-concurrent.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from testplans.onepassConcurrent import One_pass_concurrent_testplan
plan = One_pass_concurrent_testplan()
plan.build(conf)
if global_conf['test_plan'] == 'one-offline-multi-online':
conf_file = open("config/testplans/one-offline-multi-online.yaml", encoding="UTF-8")
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
from testplans.oneOfflineMultiOnline import One_offline_multi_online_testplan
plan = One_offline_multi_online_testplan()
plan.build(conf)
else:
pass
logger.info("Generating execution plan successful!")
logger.info("--------------------------------")
# 6. Execution and metrics acquisition
logger.info("Executing queries...")
plan.start(engine, workload.get_query())
logger.info("Execution finished!")
offline_metrics, online_metrics = plan.get_metrics()
offline_times = []
online_times = []
for offline_metric in offline_metrics:
for query in offline_metric:
offline_times.append(query)
for online_metric in online_metrics:
for query in online_metric:
online_times.append(query)
logger.info("Offline times...")
logger.info(str(offline_times))
logger.info("--------------------------------")
logger.info("Online times...")
logger.info(str(online_times))
logger.info("--------------------------------")
with open("./offline_times", 'w', encoding='utf-8') as f:
print(str(offline_times), file=f)
with open("./online_times", 'w', encoding='utf-8') as f:
print(str(online_times), file=f)
upload("./offline_times", "olapstorage", "tmp/offline_times")
upload("./online_times", "olapstorage", "tmp/online_times")
finish = time.time()
logger.info("Job started at: " + str(start))
logger.info("Job finished at: " + str(finish))
logger.info("Please acquire other metrics on the monitor.")
logger.info("--------------------------------")
if __name__ == '__main__':
logger = Logger('./log/benchmark.log', 'main')
if len(sys.argv) == 2:
if sys.argv[1] == "generate":
generate()
elif sys.argv[1] == "run":
run()
else:
logger.error("Invalid argument: " + sys.argv[1])
logger.error("Usage: generate | run")
else:
logger.error("Invalid number of arguments!")
logger.error("Usage: generate | run")