-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathget_key.py
executable file
·113 lines (77 loc) · 3.23 KB
/
get_key.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
# Modified from Nicolas's initial script
# Thx to Siguza and Snoolie for AEA auth block parsing information
# Requirements: pip3 install requests pyhpke
import argparse
import base64
import json
import sys
from pathlib import Path
from pprint import pprint
import requests
from pyhpke import AEADId, CipherSuite, KDFId, KEMId, KEMKey
AEA_PROFILE__HKDF_SHA256_AESCTR_HMAC__SYMMETRIC__NONE = 1
suite = CipherSuite.new(KEMId.DHKEM_P256_HKDF_SHA256, KDFId.HKDF_SHA256, AEADId.AES256_GCM)
def error(msg):
print(msg, file=sys.stderr)
exit(1)
def get_key(f, verbose: bool = False):
fields = {}
header = f.read(12)
if len(header) != 12:
error(f"Expected 12 bytes, got {len(header)}")
magic = header[:4]
if magic != b"AEA1":
error(f"Invalid magic: {magic.hex()}")
profile = int.from_bytes(header[4:7], "little")
if profile != AEA_PROFILE__HKDF_SHA256_AESCTR_HMAC__SYMMETRIC__NONE:
error(f"Invalid AEA profile: {profile}")
auth_data_blob_size = int.from_bytes(header[8:12], "little")
if auth_data_blob_size == 0:
error("No auth data blob")
auth_data_blob = f.read(auth_data_blob_size)
if len(auth_data_blob) != auth_data_blob_size:
error(f"Expected {auth_data_blob_size} bytes, got {len(auth_data_blob)}")
assert auth_data_blob[:4]
while len(auth_data_blob) > 0:
field_size = int.from_bytes(auth_data_blob[:4], "little")
field_blob = auth_data_blob[:field_size]
key, value = field_blob[4:].split(b"\x00", 1)
fields[key.decode()] = value.decode()
auth_data_blob = auth_data_blob[field_size:]
if verbose:
pprint(fields, stream=sys.stderr)
if "com.apple.wkms.fcs-response" not in fields:
error("No fcs-response field found!")
if "com.apple.wkms.fcs-key-url" not in fields:
error("No fcs-key-url field found!")
fcs_response = json.loads(fields["com.apple.wkms.fcs-response"])
enc_request = base64.b64decode(fcs_response["enc-request"])
wrapped_key = base64.b64decode(fcs_response["wrapped-key"])
url = fields["com.apple.wkms.fcs-key-url"]
r = requests.get(url, timeout=10)
r.raise_for_status()
privkey = KEMKey.from_pem(r.text)
recipient = suite.create_recipient_context(enc_request, privkey)
pt = recipient.open(wrapped_key)
if verbose:
print(f"Key: {base64.b64encode(pt).decode()}")
else:
print(base64.b64encode(pt).decode())
def main(path: str, verbose: bool = False):
if path.startswith("http://") or path.startswith("https://"):
with requests.get(path, timeout=10, stream=True) as response:
response.raise_for_status()
get_key(response.raw, verbose)
else:
aea_path = Path(path)
if not aea_path.exists():
error(f"File {path} does not exist")
with aea_path.open("rb") as f:
get_key(f, verbose)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Get the key for an AEA file or URL")
parser.add_argument("path", help="Path or URL to the AEA file")
parser.add_argument("-v", "--verbose", action="store_true", help="Show verbose output")
args = parser.parse_args()
main(args.path, args.verbose)