-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathaudit.py
269 lines (221 loc) · 8.11 KB
/
audit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/env python3.7
import os
import sys
import json
from subprocess import CalledProcessError, check_call, Popen, DEVNULL, PIPE
import github3 as gh3
from datetime import date
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
WORK = os.path.join(SCRIPT_DIR, "work")
CARGO_HOME = os.path.join(SCRIPT_DIR, ".cargo")
RUSTUP_HOME = os.path.join(SCRIPT_DIR, ".rustup")
CARGO = os.path.join(CARGO_HOME, "bin", "cargo")
SOFTDEV_GH = "https://github.com/softdevteam"
RUSTUP_URL = "https://sh.rustup.rs/"
GH_API_HOST = "api.github.com"
GH_API_REPOS = "/users/softdevteam/repos"
# Accounts to search for things to audit.
AUDIT_ACCOUNTS = ["softdevteam", "ykjit"]
# If you want to skip any soft-dev repos, you can add them here.
SKIP_REPOS = [
# Skipping rustc forks for now, as at the time of writing upstream rust
# (and thus our forks) always have vulns which are out of our control.
# Once `cargo audit` passes on upstream rust, we can reconsider these.
("softdevteam", "ykrustc"),
("softdevteam", "rustgc"),
("softdevteam", "rustgc_paper"),
("softdevteam", "rustgc_paper_experiment"),
("softdevteam", "alloy"),
("softdevteam", "alloy_hashbrown"),
# ykllvm contains rust files and thus gets categorised as a rust repo.
("ykjit", "ykllvm"),
# unmaintained repos.
("softdevteam", "k2"),
("softdevteam", "error_recovery_experiment"),
# externally maintained
("softdevteam", "WLambda"),
]
# Security advisories to skip.
# (repo-name, problem-package, rustsec-id) -> expiry-date
# Expiry date is `a datetime.date`, e.g. `date(2021, 12, 2)`.
#
# Wild cards (`*`) are allowed in the fields of the key tuple.
#
# XXX the keys of this map should also contain the account that owns the repo,
# in case different accounts contain a repo by the same name.
SKIP_ADVISORIES = {
}
UNMATCHED_SKIP_ADVISORIES = set(SKIP_ADVISORIES.keys())
# Repos which require the audit to run in a sub-dir.
# Maps a (owner, repo-name) tuple to a collection of path components suitiable
# for use with `os.path.join()`.
CUSTOM_AUDIT_DIRS = {
("ykjit", "ykcbf"): ["lang_tests"],
("ykjit", "yk-benchmarks"): ["reporter"],
}
def should_skip_advisory(adv_tup):
assert(len(adv_tup) == 3)
# Look for an exact match.
expiry = None
if adv_tup in SKIP_ADVISORIES:
UNMATCHED_SKIP_ADVISORIES.remove(adv_tup)
expiry = SKIP_ADVISORIES[adv_tup]
else:
# Now try wildcard matching.
for (skip_tup, skip_expiry) in SKIP_ADVISORIES.items():
match_list = list(adv_tup)
assert(len(skip_tup) == 3)
for idx in range(len(skip_tup)):
if skip_tup[idx] == '*':
match_list[idx] = "*"
if tuple(match_list) == skip_tup:
if skip_tup in UNMATCHED_SKIP_ADVISORIES:
UNMATCHED_SKIP_ADVISORIES.remove(skip_tup)
expiry = skip_expiry
break
if expiry is None:
return False
_, pkg, adv_id = adv_tup
if expiry <= date.today():
print(f"Note: skip for {pkg}/{adv_id} has expired.")
return False
print(f"Note: {pkg}/{adv_id} was skipped.")
return True
def get_sd_rust_repos(token_file):
"""Get a list of unarchived soft-dev repos written in Rust"""
with open(token_file) as f:
token = f.read().strip()
gh = gh3.login(token=token)
return [r for r in gh.repositories() if
r.owner.login in AUDIT_ACCOUNTS and
"Rust" in map(lambda tup: tup[0], r.languages()) and
not r.archived and
(r.owner.login, r.name) not in SKIP_REPOS]
def install_cargo_audit():
check_call(["curl", "--proto", "=https", "--tlsv1.2", "-sSf",
"https://sh.rustup.rs", "-o", "rustup.sh"])
check_call(["sh", "rustup.sh", "--no-modify-path", "-y"])
check_call([CARGO, "install", "cargo-audit"])
def audit(name, owner, repo):
direc = os.path.join(WORK, name)
# Either pull or update the source from git.
src_exists = os.path.exists(direc)
if not src_exists:
os.chdir(WORK)
git_cmd = ["git", "clone", repo, name]
else:
os.chdir(direc)
git_cmd = ["git", "pull"]
try:
check_call(git_cmd)
except CalledProcessError:
return False
os.chdir(direc)
# Repos which use sub-modules (like Rust forks) need the submodules sources
# available too.
try:
check_call(["git", "submodule", "update"])
except CalledProcessError:
return False
try:
dirs = CUSTOM_AUDIT_DIRS[(owner, name)]
except KeyError:
# No custom directories, so just audit the top-level dir.
dirs = ["."]
ok = True
for audit_dir in dirs:
# Actually do the audit.
print(f"Running audit in {audit_dir}")
os.chdir(audit_dir)
# If there's no Cargo.toml, we can't audit it.
if not os.path.exists("Cargo.toml"):
print("No Cargo.toml. Can't audit!")
ok = False
continue
# If we didn't clone afresh and `Cargo.lock` isn't tracked in git, we
# should run `cargo update` to get the same deps as we would have with
# a fresh clone.
if src_exists and os.path.exists("Cargo.lock"):
try:
check_call(["git", "ls-files", "--error-unmatch",
"Cargo.lock"], stdout=DEVNULL, stderr=DEVNULL)
except CalledProcessError:
# `Cargo.lock` not in git.
check_call(["cargo", "update"])
p = Popen([CARGO, "audit", "-D", "warnings", "--json"],
stdout=PIPE, stderr=PIPE)
sout, serr = p.communicate()
try:
js = json.loads(sout)
except json.JSONDecodeError as e:
print(e, file=sys.stderr)
print(sout, file=sys.stderr)
print(serr, file=sys.stderr)
ok = False
continue
if not process_json(name, js):
ok = False
# Something is wrong. Print human readable output.
try:
check_call([CARGO, "audit", "-D", "warnings"])
except CalledProcessError:
continue
return ok
def process_json(repo_name, js):
ret = True
problems = set()
# First look at warnings.
for kind in js["warnings"].values():
for warn in kind:
adv = warn["advisory"]
if adv is not None:
problems.add((repo_name, adv["package"], adv["id"]))
else:
# If the advisory field is None use dummy info.
problems.add((repo_name, None, None))
# Now look at vulnerabilities.
for vuln in js["vulnerabilities"]["list"]:
adv = vuln["advisory"]
if adv is not None:
problems.add((repo_name, adv["package"], adv["id"]))
else:
# If the advisory field is None use dummy info.
problems.add((repo_name, None, None))
for tup in problems:
if not should_skip_advisory(tup):
ret = False
return ret
if __name__ == "__main__":
try:
token_file = sys.argv[1]
except IndexError:
print("usage: audit.py <token-file> [repo-name]")
sys.exit(1)
try:
single_repo = sys.argv[2]
except IndexError:
single_repo = None
os.environ["RUSTUP_HOME"] = RUSTUP_HOME
os.environ["CARGO_HOME"] = CARGO_HOME
if not os.path.exists(".cargo"):
install_cargo_audit()
if not os.path.exists(WORK):
os.mkdir(WORK)
repos = get_sd_rust_repos(token_file)
problematic = []
for r in repos:
if single_repo and single_repo != r.name:
continue
print(f"\n\nChecking {r.clone_url}...")
res = audit(r.name, r.owner.login, r.clone_url)
if not res:
problematic.append(r.name)
if UNMATCHED_SKIP_ADVISORIES:
print("Warning: the following advisory skips were unmatched:")
for i in UNMATCHED_SKIP_ADVISORIES:
print(f" {i}")
if problematic:
print("\n\nThe following repos have problems:")
for p in problematic:
print(f" {p}")
sys.exit(1)