-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvc.python
114 lines (100 loc) · 4.53 KB
/
vc.python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import jwt
import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import os
import urllib3
import json
# Disable insecure request warnings (only for development with self-signed certs)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DID:
def __init__(self):
print("Step 1: Generating DID")
self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
self.public_key = self.private_key.public_key()
self.id = f"did:example:{os.urandom(16).hex()}"
print(f"DID generated: {self.id}")
class VC:
def __init__(self, did, claims):
print("\nStep 2: Creating Verifiable Credential")
self.did = did
self.issuer = "did:example:issuer"
self.subject = did.id
self.claims = claims
print(f"VC created for subject: {self.subject}")
print(f"VC claims: {self.claims}")
def generate_zkp(vc, challenge):
print(f"Generating ZKP for challenge: {challenge}")
message = f"{vc.claims['access']}:{challenge}".encode()
signature = vc.did.private_key.sign(
message,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
print(f"ZKP generated: {signature.hex()[:20]}...")
return signature
def register_public_key(did, op_url):
print(f"\nStep 3: Registering public key with OP")
print(f"OP URL: {op_url}")
public_key_pem = did.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
data = {'did': did.id, 'public_key': public_key_pem.decode()}
print(f"Request data: {json.dumps(data, indent=2)}")
response = requests.post(f"{op_url}/register_key", json=data, verify=False)
print(f"Response status: {response.status_code}")
print(f"Response body: {json.dumps(response.json(), indent=2)}")
response.raise_for_status()
print("Public key registered successfully")
def authenticate_with_op(did, vc, op_url):
print(f"\nStep 4: Starting authentication with OP")
print(f"OP URL: {op_url}")
print("Step 4.1: Requesting challenge")
challenge_response = requests.get(f"{op_url}/challenge", verify=False)
print(f"Challenge response status: {challenge_response.status_code}")
print(f"Challenge response body: {json.dumps(challenge_response.json(), indent=2)}")
challenge_response.raise_for_status()
challenge = challenge_response.json()['challenge']
print("\nStep 4.2: Generating ZKP")
zkp = generate_zkp(vc, challenge)
auth_data = {
'did': did.id,
'zkp': zkp.hex(),
'vc_claims': vc.claims
}
headers = {'X-Challenge': challenge}
print(f"\nStep 4.3: Sending authentication request")
print(f"Request headers: {json.dumps(headers, indent=2)}")
print(f"Request data: {json.dumps(auth_data, indent=2)}")
auth_response = requests.post(f"{op_url}/authenticate", json=auth_data, headers=headers, verify=False)
print(f"Authentication response status: {auth_response.status_code}")
print(f"Authentication response body: {json.dumps(auth_response.json(), indent=2)}")
auth_response.raise_for_status()
jwt_token = auth_response.json()['jwt']
print(f"Received JWT: {jwt_token[:20]}...")
return jwt_token
def get_oauth_token(jwt_token, op_url):
print(f"\nStep 5: Exchanging JWT for OAuth token")
data = {'jwt': jwt_token}
print(f"Request data: {json.dumps(data, indent=2)}")
token_response = requests.post(f"{op_url}/token", json=data, verify=False)
print(f"Token response status: {token_response.status_code}")
print(f"Token response body: {json.dumps(token_response.json(), indent=2)}")
token_response.raise_for_status()
return token_response.json()['access_token'], token_response.json()['resource']
def main():
did = DID()
vc = VC(did, {'access': 'Ken'})
op_url = 'https://localhost:5000'
try:
register_public_key(did, op_url)
jwt_token = authenticate_with_op(did, vc, op_url)
oauth_token, resource = get_oauth_token(jwt_token, op_url)
print(f"\nStep 6: Final Results")
print(f"OAuth Token: {oauth_token}")
print(f"Accessed Resource: {resource}")
except requests.exceptions.RequestException as e:
print(f"Error occurred: {e}")
if __name__ == "__main__":
main()