Skip to content

Commit

Permalink
Update EDA auth class to not use old Role model (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dostonbek1 authored Apr 18, 2024
1 parent eafdea6 commit f772005
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 70 deletions.
22 changes: 3 additions & 19 deletions ansible_base/jwt_consumer/eda/auth.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
import logging

from ansible_base.jwt_consumer.common.auth import JWTAuthentication
from ansible_base.jwt_consumer.common.exceptions import InvalidService
from drf_spectacular.extensions import OpenApiAuthenticationExtension

try:
from aap_eda.core import models
from drf_spectacular.extensions import OpenApiAuthenticationExtension
except ImportError:
raise InvalidService("eda")
from ansible_base.jwt_consumer.common.auth import JWTAuthentication

logger = logging.getLogger("ansible_base.jwt_consumer.eda.auth")


class EDAJWTAuthentication(JWTAuthentication):
def process_permissions(self, user, claims, token):
logger.info("Processing permissions")

if token.get("is_superuser", False):
self._add_roles(user, "Admin", "is_superuser")

if token.get("is_system_auditor", False):
self._add_roles(user, "Auditor", "is_system_auditor")

def _add_roles(self, user, role_name, user_type):
logger.info(f"{user.username} is {user_type}. Adding role {role_name} to user {user.username}")
role_id = models.Role.objects.filter(name=role_name).first().id
user.roles.add(role_id)
logger.info("Processing permissions for {}".format(user.username))


class EDAJWTAuthScheme(OpenApiAuthenticationExtension):
Expand Down
59 changes: 8 additions & 51 deletions test_app/tests/jwt_consumer/eda/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
import sys
from unittest.mock import MagicMock

import pytest
from ansible_base.jwt_consumer.eda.auth import EDAJWTAuthentication


def test_eda_import_error():
from ansible_base.jwt_consumer.common.exceptions import InvalidService

with pytest.raises(InvalidService):
import ansible_base.jwt_consumer.eda.auth # noqa: 401
def test_eda_process_permissions(user, caplog):
authentication = EDAJWTAuthentication()
claims = {}
token = {}
with caplog.at_level(logging.INFO):
authentication.process_permissions(user, claims, token)
assert f"Processing permissions for {user.username}" in caplog.text


def test_eda_jwt_auth_scheme():
Expand All @@ -19,48 +21,3 @@ def test_eda_jwt_auth_scheme():
scheme = EDAJWTAuthScheme(None)
response = scheme.get_security_definition(None)
assert 'name' in response and response['name'] == 'X-DAB-JW-TOKEN'


def filter_function(name):
role = None
if name == 'Admin':
role = MagicMock(id=1)
elif name == 'Auditor':
role = MagicMock(id=2)
return MagicMock(**{'first.return_value': role})


@pytest.fixture
def mocked_authenticator():
sys.modules['aap_eda.core'] = MagicMock()
from ansible_base.jwt_consumer.eda.auth import EDAJWTAuthentication # noqa: E402
from ansible_base.jwt_consumer.eda.auth import models

models.Role.objects.filter = filter_function

authenticator = EDAJWTAuthentication()
# patch.object(authenticator.models.Role.objects, 'filter', alan_filter)
# authenticator.models = MagicMock(**{"Role.objects.filter": filter_function})
return authenticator


def test_eda_jwt_auth_add_roles(mocked_authenticator, caplog):
with caplog.at_level(logging.INFO):
user = MagicMock(username='timmy', roles=set())
user_type = 'super_user'
role_name = 'Auditor'
mocked_authenticator._add_roles(user, role_name, user_type)
assert f"{user.username} is {user_type}. Adding role {role_name} to user {user.username}" in caplog.text


@pytest.mark.parametrize(
'is_superuser,is_system_auditor,results', ((False, False, set()), (True, False, set([1])), (False, True, set([2])), (True, True, set([1, 2])))
)
def test_eda_jwt_auth_process_permissions(mocked_authenticator, is_superuser, is_system_auditor, results):
user = MagicMock(username='timmy', roles=set())
token = {
'is_superuser': is_superuser,
'is_system_auditor': is_system_auditor,
}
mocked_authenticator.process_permissions(user, {}, token)
assert user.roles == results

0 comments on commit f772005

Please sign in to comment.