Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve tests #436

Merged
merged 30 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
652219b
make fixtures available globally
EduardSchwarzkopf May 30, 2024
5b10d00
add pacu modules to testpath
EduardSchwarzkopf May 30, 2024
d88c24a
use local conftest
EduardSchwarzkopf May 30, 2024
3d4e67f
use conftest and formatting
EduardSchwarzkopf May 30, 2024
8be81a6
move fixture to conftest.py
EduardSchwarzkopf May 30, 2024
c8606c2
move fixtures to conftest.py
EduardSchwarzkopf May 30, 2024
9253223
init tests
EduardSchwarzkopf May 30, 2024
958e5d3
rename file, to have unique test file names
EduardSchwarzkopf May 30, 2024
8798c6a
update aws_credentials fixture with cleanup
EduardSchwarzkopf May 30, 2024
0eb3eab
standalone test poc
EduardSchwarzkopf May 31, 2024
f0542a2
update conftest
EduardSchwarzkopf May 31, 2024
20299c3
add region as setting
EduardSchwarzkopf May 31, 2024
302bf1f
aws_credentials are now a autouse fixture
EduardSchwarzkopf May 31, 2024
771690e
add check for TokenType
EduardSchwarzkopf May 31, 2024
5ffe241
add conftest
EduardSchwarzkopf May 31, 2024
a58f1c6
add dataclasses
EduardSchwarzkopf May 31, 2024
1dca34c
add minimal test
EduardSchwarzkopf Jun 5, 2024
5a59733
update fixture to provide with context
EduardSchwarzkopf Jun 5, 2024
c084592
add client type
EduardSchwarzkopf Jun 5, 2024
545a5eb
update imports
EduardSchwarzkopf Jun 5, 2024
2a1fb5e
update imports
EduardSchwarzkopf Jun 5, 2024
dee66ad
sort imports
EduardSchwarzkopf Jun 8, 2024
efa9950
remove schema
EduardSchwarzkopf Jun 8, 2024
ebb81b0
add sanity test
EduardSchwarzkopf Jun 8, 2024
5827f95
update user pool
EduardSchwarzkopf Jun 8, 2024
c650612
add imports for sanity check
EduardSchwarzkopf Jun 8, 2024
7aba55a
update password
EduardSchwarzkopf Jun 8, 2024
440b0e1
update tests
EduardSchwarzkopf Jun 8, 2024
acbd443
Merge branch 'RhinoSecurityLabs:master' into improve-tests
EduardSchwarzkopf Jun 8, 2024
b99ed79
Merge branch 'master' into improve-tests
DaveYesland Jun 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import pytest
from pacu import settings, Main
from pacu import core
from pacu.core.models import PacuSession
from sqlalchemy import orm, Column, create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy_utils import JSONType
from sqlalchemy.engine import Engine

from pacu.modules.cognito__attack.tests.dataclasses import AWSCredentials

settings.DATABASE_CONNECTION_PATH = "sqlite:///:memory:"


@pytest.fixture(scope="function")
def db() -> core.base:
core.base.engine: Engine = create_engine(settings.DATABASE_CONNECTION_PATH)
core.base.Session: sessionmaker = sessionmaker(bind=core.base.engine)
core.base.Base.metadata.create_all(core.base.engine)
yield core.base.Session()


@pytest.fixture(scope="function")
def pacu(db, aws_credentials: AWSCredentials, active_session: PacuSession):
pacu = Main()
pacu.database = db

pacu.set_keys(
key_alias="pytest",
access_key_id=aws_credentials.access_key_id,
secret_access_key=aws_credentials.secret_access_key,
session_token=aws_credentials.session_token,
)

yield pacu


@pytest.fixture(scope="function")
def active_session(db, pacu_session: PacuSession):
pacu_session.activate(db)
yield pacu_session


@pytest.fixture(scope="function")
def pacu_session(db: orm.session.Session):
query: orm.Query = db.query(PacuSession)
assert query.count() == 0

pacu_session = PacuSession(name="test")
db.add(pacu_session)
yield pacu_session


@pytest.fixture(scope="function")
def db_new_column(db: Session):
PacuSession.TestSvc = Column(JSONType, nullable=False, default=dict)
PacuSession.aws_data_field_names = PacuSession.aws_data_field_names + ("TestSvc",)
core.base.Session: sessionmaker = sessionmaker(bind=core.base.engine)
yield core.base.Session()


@pytest.fixture(scope="function")
def pacu_with_data(pacu: Main, active_session: PacuSession):
active_session.update(pacu.database, CloudWatch={"test_key": "test_value"})
yield pacu


@pytest.fixture(autouse=True)
def aws_credentials():
"""Mocked AWS Credentials for moto."""

value = "testing"
creds = AWSCredentials(
access_key_id=value,
secret_access_key=value,
session_token=value,
security_token=value,
)

os.environ["AWS_ACCESS_KEY_ID"] = creds.access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = creds.secret_access_key
os.environ["AWS_SECURITY_TOKEN"] = creds.session_token
os.environ["AWS_SESSION_TOKEN"] = creds.session_token

yield creds
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest
import moto
import boto3
import os


@pytest.fixture(scope="function")
def s3():
with moto.mock_s3():
yield boto3.client("s3", region_name="us-east-1")


@pytest.fixture
def environ():
os.environ["PRINCIPAL"] = "asdf"
Original file line number Diff line number Diff line change
@@ -1,66 +1,61 @@
import os
import re
import boto3
import moto

import pytest

from chalice.test import Client
from pacu.modules.cfn__resource_injection.cfn__resource_injection_lambda.app import app, add_role_dict, add_role_bytes, fetch, update
from pacu.modules.cfn__resource_injection.cfn__resource_injection_lambda.app import (
add_role_dict,
add_role_bytes,
update,
)


@pytest.fixture
def environ():
os.environ['PRINCIPAL'] = 'asdf'
def upload_json(environ, s3):
s3.create_bucket(Bucket="test-bucket")
s3.put_object(Bucket="test-bucket", Key="test-key", Body=new_cfn_json)
return "test-bucket", "test-key", new_cfn_json


def test_add_role_dict(environ):
@pytest.mark.usefixtures("environ")
def test_add_role_dict():
cfn = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test CloudFormation Template",
"Globals": {},
"Outputs": {},
"Resources": {
"OurBucket": {
"Properties": {},
"Type": "AWS::S3::Bucket"
}
}
"Resources": {"OurBucket": {"Properties": {}, "Type": "AWS::S3::Bucket"}},
}

assert {
'AWSTemplateFormatVersion': '2010-09-09',
'Description': 'Test CloudFormation Template',
'Globals': {},
'Outputs': {},
'Resources': {
'MaintenanceRole': {
'Properties': {
'AssumeRolePolicyDocument': '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", '
'"Principal": {"AWS": "asdf"}, "Action": "sts:AssumeRole"}]}',
'Policies': [
{
'PolicyDocument': {
'Statement': [
{
'Action': '*',
'Effect': 'Allow',
'Resource': '*'
}
],
'Version': '2012-10-17',
},
'PolicyName': 'default',
}
]
},
'Type': 'AWS::IAM::Role',
},
'OurBucket': {
'Properties': {},
'Type': 'AWS::S3::Bucket',
}
}
} == add_role_dict(cfn)
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test CloudFormation Template",
"Globals": {},
"Outputs": {},
"Resources": {
"MaintenanceRole": {
"Properties": {
"AssumeRolePolicyDocument": '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", '
'"Principal": {"AWS": "asdf"}, "Action": "sts:AssumeRole"}]}',
"Policies": [
{
"PolicyDocument": {
"Statement": [
{"Action": "*", "Effect": "Allow", "Resource": "*"}
],
"Version": "2012-10-17",
},
"PolicyName": "default",
}
],
},
"Type": "AWS::IAM::Role",
},
"OurBucket": {
"Properties": {},
"Type": "AWS::S3::Bucket",
},
},
} == add_role_dict(cfn)


new_cfn_json = b"""
Expand Down Expand Up @@ -134,49 +129,27 @@ def test_add_role_bytes_yaml(environ):
add_role_bytes(new_cfn_yaml)


@pytest.fixture
def upload_json(environ, s3):
s3.create_bucket(Bucket='test-bucket')
s3.put_object(Bucket='test-bucket', Key='test-key', Body=new_cfn_json)
return 'test-bucket', 'test-key', new_cfn_json


@pytest.fixture
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='function')
def s3(aws_credentials):
with moto.mock_s3():
yield boto3.client('s3', region_name='us-east-1')


def test_update(s3, upload_json):
bucket, key, _ = upload_json[0], upload_json[1], upload_json[2]

body = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
body = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
assert clean_json(new_cfn_json.decode()) == clean_json(body.decode())

update(s3, bucket, key)

body = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
body = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
assert clean_json(modified_cfn_json.decode()) == clean_json(body.decode())


def test_update_second_time(s3, upload_json):
bucket, key, body = upload_json[0], upload_json[1], upload_json[2]
bucket, key = upload_json[0], upload_json[1]
update(s3, bucket, key)
resp = s3.get_object(Bucket=bucket, Key=key)
a = clean_json(modified_cfn_json.decode())
b = clean_json(resp['Body'].read().decode())
b = clean_json(resp["Body"].read().decode())
assert a == b


def clean_json(s: str):
c = re.compile(r'\s')
return c.sub('', s)
c = re.compile(r"\s")
return c.sub("", s)
37 changes: 22 additions & 15 deletions pacu/modules/cognito__attack/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import re
import argparse
import base64
import json
import re
import webbrowser
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional

import qrcode
import argparse
import json
from botocore.client import BaseClient
from botocore.exceptions import ClientError
from pycognito.aws_srp import AWSSRP
from dataclasses import dataclass
from typing import List, Dict, Optional
from pycognito.exceptions import SoftwareTokenMFAChallengeException
from copy import deepcopy
from botocore.exceptions import ClientError

from pacu import Main
from botocore.client import BaseClient

# Using Spencer's iam_enum.py as a template

Expand Down Expand Up @@ -500,9 +502,12 @@ def main(args, pacu_main: Main):
"Your refresh token is: "
+ tokens["AuthenticationResult"]["RefreshToken"]
)
print(
"Your token type is: " + tokens["AuthenticationResult"]["TokenType"]
)

if "TokenType" in tokens["AuthenticationResult"]:
print(
"Your token type is: "
+ tokens["AuthenticationResult"]["TokenType"]
)
attack_user["Username"] = username
attack_user["Region"] = up_client["Region"]
attack_user["UserPoolId"] = up_client["UserPoolId"]
Expand All @@ -516,9 +521,11 @@ def main(args, pacu_main: Main):
attack_user["Tokens"]["RefreshToken"] = tokens["AuthenticationResult"][
"RefreshToken"
]
attack_user["Tokens"]["TokenType"] = tokens["AuthenticationResult"][
"TokenType"
]

if "TokenType" in tokens["AuthenticationResult"]:
attack_user["Tokens"]["TokenType"] = tokens["AuthenticationResult"][
"TokenType"
]
credentials = get_identity_credentials(
cognito_identity_pools,
identity_client,
Expand Down Expand Up @@ -944,7 +951,7 @@ def main(args, pacu_main: Main):
for var in vars(args):
if var == "regions":
continue
if not getattr(args, var) and ARG_FIELD_MAPPER[var] in gathered_data:
if not getattr(args, var) and ARG_FIELD_MAPPER.get(var) in gathered_data:
del gathered_data[ARG_FIELD_MAPPER[var]]

cognito_data = deepcopy(session.Cognito)
Expand Down
48 changes: 48 additions & 0 deletions pacu/modules/cognito__attack/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import boto3
import moto
import pytest

from pacu.modules.cognito__attack.tests.dataclasses import CognitoServiceConfig
from pacu.settings import REGION


@pytest.fixture
def mock_cognito_user_pool():

with moto.mock_cognitoidp():

client = boto3.client(
"cognito-idp",
region_name=REGION,
)

response = client.create_user_pool(
PoolName="TestUserPool", UsernameAttributes=["email"]
)

user_pool_id = response["UserPool"]["Id"]

client_response = client.create_user_pool_client(
ClientName="AppClient",
GenerateSecret=False,
UserPoolId=user_pool_id,
)

with moto.mock_cognitoidentity():
c = boto3.client(
"cognito-identity",
region_name=REGION,
)

c_resposnse = c.create_identity_pool(
IdentityPoolName="TestIdentityPool",
AllowUnauthenticatedIdentities=False,
)

yield CognitoServiceConfig(
client=client,
user_pool_id=user_pool_id,
client_id=client_response["UserPoolClient"]["ClientId"],
client_name=client_response["UserPoolClient"]["ClientName"],
identity_pool_id=c_resposnse["IdentityPoolId"],
)
Loading
Loading