From 83e3e2d50d3031dff4eaffdee7fa0477dcd88603 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20H=C3=A9riveaux?= Date: Fri, 30 Jul 2021 13:52:33 +0200 Subject: [PATCH] Proposal for #236 --- speculos/client.py | 46 ++++++++++++++++++++++++++---------------- tests/apps/test_btc.py | 2 +- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/speculos/client.py b/speculos/client.py index 73f59d5a..b4593c59 100644 --- a/speculos/client.py +++ b/speculos/client.py @@ -33,14 +33,31 @@ def check_status_code(response: requests.Response, url: str) -> None: class ApduResponse: def __init__(self, response: requests.Response) -> None: - self.response = response + self.__response = response - def receive(self) -> bytes: - check_status_code(self.response, "/apdu") - data, status = split_apdu(bytes.fromhex(self.response.json()["data"])) - if status != 0x9000: - raise ApduException(status, data) - return data + def __receive(self): + """ Get the response if not yet received and store it in `__data` and `__status` attributes """ + if self.__response is not None: + check_status_code(self.__response, "/apdu") + apdu_message = bytes.fromhex(self.__response.json()["data"]) + self.__data, self.__status = split_apdu(apdu_message) + self.__response = None + + def data(self, expected_sw: int = 0x9000) -> bytes: + """ + :return: Received data (status word stripped) + :param expected_sw: Expected status word value. + :raises ApduException: If received status word is different from `expected_sw` parameter. + """ + self.__receive() + if self.__status != expected_sw: + raise ApduException(self.__status, self.__data) + return self.__data + + def sw(self) -> int: + """ :return: Received status word """ + self.__receive() + return self.__status def split_apdu(data: bytes) -> Tuple[bytes, int]: @@ -118,13 +135,8 @@ def get_screenshot(self) -> bytes: check_status_code(response, "/screenshot") return response.content - def _apdu_exchange(self, data: bytes) -> bytes: - with self.session.post(f"{self.api_url}/apdu", json={"data": data.hex()}) as response: - apdu_response = ApduResponse(response) - return apdu_response.receive() - - def _apdu_exchange_nowait(self, data: bytes) -> requests.Response: - return self.session.post(f"{self.api_url}/apdu", json={"data": data.hex()}, stream=True) + def _apdu_exchange(self, data: bytes, stream: bool = False) -> requests.Response: + return self.session.post(f"{self.api_url}/apdu", json={"data": data.hex()}, stream=stream) def set_automation_rules(self, rules: dict) -> None: with self.session.post(f"{self.api_url}/automation", json=rules) as response: @@ -195,9 +207,9 @@ def __init__(self, app: str, args: List[str] = [], api_url: str = "http://127.0. SpeculosInstance.start(self) Api.__init__(self, api_url) - def apdu_exchange(self, cla: int, ins: int, data: bytes = b"", p1: int = 0, p2: int = 0) -> bytes: + def apdu_exchange(self, cla: int, ins: int, data: bytes = b"", p1: int = 0, p2: int = 0) -> ApduResponse: apdu = bytes([cla, ins, p1, p2, len(data)]) + data - return Api._apdu_exchange(self, apdu) + return ApduResponse(Api._apdu_exchange(self, apdu)) @contextmanager def apdu_exchange_nowait( @@ -206,7 +218,7 @@ def apdu_exchange_nowait( apdu = bytes([cla, ins, p1, p2, len(data)]) + data response = None try: - response = Api._apdu_exchange_nowait(self, apdu) + response = Api._apdu_exchange(self, apdu, stream=True) yield ApduResponse(response) finally: if response: diff --git a/tests/apps/test_btc.py b/tests/apps/test_btc.py index 4dad60fd..c4ba2b59 100755 --- a/tests/apps/test_btc.py +++ b/tests/apps/test_btc.py @@ -68,7 +68,7 @@ def test_btc_get_public_key_with_user_approval(client, app): client.press_and_release("both") with pytest.raises(speculos.client.ApduException) as e: - response.receive() + response.data() assert e.value.sw == 0x6985