-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement fuzzing driver using cobrafuzz
Ref. eng/recordflux/RecordFlux#1305
- Loading branch information
Showing
5 changed files
with
207 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,3 +22,4 @@ htmlcov | |
pyproject.toml | ||
why3session.xml.bak | ||
why3shapes.gz.bak | ||
/crashes |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import logging | ||
import sys | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
import rflx.specification.parser | ||
from tests.const import SPEC_DIR | ||
from tools import fuzz_driver | ||
|
||
|
||
class UnexpectedError(Exception): | ||
pass | ||
|
||
|
||
def test_no_bug( | ||
monkeypatch: pytest.MonkeyPatch, | ||
tmpdir: Path, | ||
caplog: pytest.LogCaptureFixture, | ||
) -> None: | ||
with monkeypatch.context() as mp: | ||
mp.setattr( | ||
sys, | ||
"argv", | ||
[ | ||
"fuzz_driver.py", | ||
"--state-dir", | ||
str(tmpdir), | ||
"--corpus-dir", | ||
str(SPEC_DIR), | ||
"--runs", | ||
"10", | ||
], | ||
) | ||
|
||
with caplog.at_level(logging.INFO), pytest.raises(SystemExit, match="^0$"): | ||
fuzz_driver.main() | ||
assert "did 10 runs, stopping now." in caplog.text | ||
|
||
|
||
def test_unexpected_error( | ||
monkeypatch: pytest.MonkeyPatch, | ||
tmpdir: Path, | ||
caplog: pytest.LogCaptureFixture, | ||
) -> None: | ||
def raise_unexpected_exception() -> None: | ||
raise UnexpectedError("Unexpected error") | ||
|
||
with monkeypatch.context() as mp: | ||
mp.setattr( | ||
sys, | ||
"argv", | ||
[ | ||
"fuzz_driver.py", | ||
"--state-dir", | ||
str(tmpdir), | ||
"--corpus-dir", | ||
str(SPEC_DIR), | ||
"--artifact-file", | ||
str(tmpdir / "crash"), | ||
"--runs", | ||
"10", | ||
], | ||
) | ||
mp.setattr( | ||
rflx.specification.Parser, | ||
"parse_string", | ||
lambda _c, _s: raise_unexpected_exception(), | ||
) | ||
|
||
with caplog.at_level(logging.INFO), pytest.raises(SystemExit, match="^76$"): | ||
fuzz_driver.main() | ||
assert f'sample was written to {tmpdir/ "crash"}' in caplog.text | ||
|
||
|
||
def test_decode_error( | ||
monkeypatch: pytest.MonkeyPatch, | ||
tmpdir: Path, | ||
caplog: pytest.LogCaptureFixture, | ||
) -> None: | ||
def raise_decode_error() -> None: | ||
raise UnicodeDecodeError( | ||
"fakeenc", | ||
b"deafbeef", | ||
1, | ||
2, | ||
"Error", | ||
) | ||
|
||
with monkeypatch.context() as mp: | ||
mp.setattr( | ||
sys, | ||
"argv", | ||
[ | ||
"fuzz_driver.py", | ||
"--state-dir", | ||
str(tmpdir), | ||
"--corpus-dir", | ||
str(SPEC_DIR), | ||
"--artifact-file", | ||
str(tmpdir / "crash"), | ||
"--runs", | ||
"10", | ||
], | ||
) | ||
mp.setattr( | ||
rflx.specification.Parser, | ||
"parse_string", | ||
lambda _c, _s: raise_decode_error(), | ||
) | ||
|
||
with caplog.at_level(logging.INFO), pytest.raises(SystemExit, match="^0$"): | ||
fuzz_driver.main() | ||
assert "did 10 runs, stopping now." in caplog.text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import argparse | ||
import random | ||
import sys | ||
from pathlib import Path | ||
|
||
from cobrafuzz.fuzzer import Fuzzer | ||
|
||
from rflx import error | ||
from rflx.model import Cache, Digest | ||
from rflx.specification import parser | ||
|
||
|
||
class SilentlyNeverVerify(Cache): | ||
def __init__(self) -> None: | ||
pass | ||
|
||
def is_verified(self, _: Digest) -> bool: | ||
return True | ||
|
||
def add_verified(self, digest: Digest) -> None: | ||
pass | ||
|
||
|
||
def fuzz(buf: bytes) -> None: | ||
try: | ||
string = buf.decode("utf-8") | ||
p = parser.Parser(cache=SilentlyNeverVerify()) | ||
p.parse_string(string) | ||
p.create_model() | ||
except (UnicodeDecodeError, error.RecordFluxError): | ||
pass | ||
except KeyboardInterrupt: # pragma: no cover | ||
sys.exit() | ||
|
||
|
||
def main() -> None: | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
"--state-dir", | ||
type=Path, | ||
required=True, | ||
help="Directory to hold fuzzer state", | ||
) | ||
parser.add_argument( | ||
"--corpus-dir", | ||
type=Path, | ||
required=True, | ||
help="Directory to extract corpus from", | ||
) | ||
parser.add_argument( | ||
"--artifact-file", | ||
type=Path, | ||
help="Store single artifact in given file", | ||
) | ||
parser.add_argument( | ||
"--runs", | ||
type=int, | ||
default=-1, | ||
help="Maxium number of runs", | ||
) | ||
parser.add_argument( | ||
"--timeout", | ||
type=int, | ||
default=30, | ||
help="Time after which an execution is considered a hang", | ||
) | ||
arguments = parser.parse_args() | ||
|
||
corpus = [Path(p) for p in arguments.corpus_dir.glob("**/*.rflx")] | ||
random.shuffle(corpus) | ||
|
||
fuzzer = Fuzzer( | ||
fuzz, | ||
dirs=[Path(arguments.state_dir), *corpus], | ||
timeout=arguments.timeout, | ||
runs=arguments.runs, | ||
exact_artifact_path=arguments.artifact_file, | ||
) | ||
fuzzer.start() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() # pragma: no cover |