-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathanalyzer.py
executable file
·119 lines (99 loc) · 4.82 KB
/
analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import sys
import json
import glob
import os
import tempfile
from datetime import datetime
from os import path
from os.path import join
from sys import argv, stdout, stderr
from argparse import ArgumentParser
from fabric import Connection
from configparser import ConfigParser
from collections import OrderedDict
from code_analyzer.ast_analyzer import analyze_projects
# https://stackoverflow.com/questions/5574702/how-to-print-to-stderr-in-python
def error_print(*args, **kwargs):
print(*args, file=stderr, **kwargs)
def info_print(*args, **kwargs):
print(*args, file=stdout, **kwargs)
def export_projects(projects, name):
with open(name, mode='w') as outfile:
json.dump(projects, outfile, indent = 2)
# change default behavior of ConfigParser
# instead of overwriting sections with same key,
# accumulate the results
class multidict(OrderedDict):
def __setitem__(self, key, val):
if isinstance(val, dict):
if key in self:
self.update(key, {**self.get(key), **val})
return
OrderedDict.__setitem__(self, key, val)
def open_config(parsed_args, exec_dir):
cfg = ConfigParser(dict_type = multidict, strict = False)
default_cfg = parsed_args.config_file
cfg_file = path.join(exec_dir, default_cfg)
# Main config file
if path.exists(cfg_file):
info_print(f"Opening config file {default_cfg}")
else:
error_print(f"Config file {default_cfg} not found! Abort.")
sys.exit(1)
cfg.read([cfg_file])
return cfg
def main():
parser = ArgumentParser(description='Analyze the AST')
parser.add_argument('collection_path', type=str,
help='folder where the AST archives are stored')
parser.add_argument('--config-file', dest='config_file', default='analyze.cfg', action='store', help='Application config file')
parser.add_argument('--ast-archive', dest='ast_archive', help='Folder where the AST archives are stored/downloaded to', default="ast_archive")
parser.add_argument('--results-dir', dest='results_dir', help="Folder where the cxx-langstat output is going to be saved", default='analyze')
parsed_args = parser.parse_args(argv[1:])
# path_to_collection = join('/home/cdragancea/runs_archived/', parsed_args.collection_path)
# cfg = open_config(parsed_args, path.dirname(path.realpath(__file__)))
# cfg['output'] = {'verbose' : parsed_args.verbose}
# if parsed_args.out_to_file:
# cfg['output']['file'] = parsed_args.out_to_file
# cfg['output']['time'] = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
# if parsed_args.n_jobs:
# cfg["build"]["jobs"] = parsed_args.n_jobs
cfg = open_config(parsed_args, path.dirname(path.realpath(__file__)))
if parsed_args.ast_archive:
cfg['analyze']['ast_archive'] = parsed_args.ast_archive
path_to_collection = join(cfg['remote']['preceding_path_to_collection'], parsed_args.collection_path)
print(f"path_to_collection: {path_to_collection}")
try:
# TODO: for now, the analyzer looks for ASTs on a remote storage server. Make it work for local as well
USER = cfg['remote']['user']
HOST = cfg['remote']['host']
# Create a temporary file using tempfile.mkstemp
_, temp_file_path = tempfile.mkstemp()
with Connection(host = HOST, user = USER) as connection:
# print(connection.run("uname -s").stdout.strip())
try:
result = connection.get(remote = join(path_to_collection, 'build_summary.json'), local = temp_file_path)
with open(temp_file_path, "r") as tmp_file:
projects_info = json.load(tmp_file)
except Exception as e:
result = connection.run(f"ls {path_to_collection}", hide = True)
print(result.stdout)
project_list = result.stdout.strip().split()
project_list = [file[:-7] for file in project_list if file.endswith('.tar.gz')]
result = connection.get(remote = f'{path_to_collection}/build_summary.json', local = temp_file_path)
with open(temp_file_path, "r") as fin:
projects_info = {k : v for (k, v) in json.load(fin).items() if k in project_list}
os.unlink(temp_file_path)
except Exception as e:
error_print(f'Failed to read build_summary: {e}')
sys.exit(1)
# return
result = analyze_projects(path_to_collection = path_to_collection,
ast_archive_root = parsed_args.ast_archive,
results_dir_root = parsed_args.results_dir,
projects_info = projects_info,
cfg = cfg)
# if parsed_args.export_repos is not None:
# export_projects(repositories, parsed_args.export_repos)
if __name__ == "__main__":
main()