-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PB-1281: Disable old authentication for STAC v1
- Loading branch information
Showing
8 changed files
with
447 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from django.conf import settings | ||
|
||
from rest_framework.authentication import BasicAuthentication | ||
from rest_framework.authentication import SessionAuthentication | ||
from rest_framework.authentication import TokenAuthentication | ||
|
||
|
||
class RestrictedBasicAuthentication(BasicAuthentication): | ||
""" A Django rest framework basic authentication class that skips v1 of STAC API. """ | ||
|
||
def authenticate(self, request): | ||
if settings.FEATURE_AUTH_RESTRICT_V1 and request.path.startswith( | ||
f'/{settings.STAC_BASE}/v1/' | ||
): | ||
return None | ||
|
||
return super().authenticate(request) | ||
|
||
|
||
class RestrictedSessionAuthentication(SessionAuthentication): | ||
""" A Django rest framework session authentication class that skips v1 of STAC API. """ | ||
|
||
def authenticate(self, request): | ||
if settings.FEATURE_AUTH_RESTRICT_V1 and request.path.startswith( | ||
f'/{settings.STAC_BASE}/v1/' | ||
): | ||
return None | ||
|
||
return super().authenticate(request) | ||
|
||
|
||
class RestrictedTokenAuthentication(TokenAuthentication): | ||
""" A Django rest framework token authentication class that skips v1 of STAC API. """ | ||
|
||
def authenticate(self, request): | ||
if settings.FEATURE_AUTH_RESTRICT_V1 and request.path.startswith( | ||
f'/{settings.STAC_BASE}/v1/' | ||
): | ||
return None | ||
|
||
return super().authenticate(request) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,9 @@ | |
from django.conf import settings | ||
from django.contrib.auth import get_user_model | ||
from django.test import Client | ||
from django.test import override_settings | ||
|
||
from rest_framework.authtoken.models import Token | ||
|
||
from stac_api.models.item import Asset | ||
from stac_api.models.item import AssetUpload | ||
|
@@ -1310,3 +1313,71 @@ def test_create_multipart_upload_on_external_asset(self): | |
) | ||
|
||
self.assertStatusCode(400, response) | ||
|
||
|
||
@override_settings(FEATURE_AUTH_RESTRICT_V1=True) | ||
class AssetUploadDisabledAuthenticationEndpointTestCase(AssetUploadBaseTest): | ||
|
||
def setUp(self): # pylint: disable=invalid-name | ||
self.client = Client() | ||
self.factory = Factory() | ||
self.collection = self.factory.create_collection_sample().model | ||
self.item = self.factory.create_item_sample(collection=self.collection).model | ||
self.asset = self.factory.create_asset_sample(item=self.item, sample='asset-no-file').model | ||
self.maxDiff = None # pylint: disable=invalid-name | ||
self.username = 'SherlockHolmes' | ||
self.password = '221B_BakerStreet' | ||
self.user = get_user_model().objects.create_user( | ||
self.username, '[email protected]', self.password | ||
) | ||
|
||
def run_test(self, headers=None): | ||
number_parts = 2 | ||
file_like, checksum_multihash = get_file_like_object(1 * KB) | ||
offset = 1 * KB // number_parts | ||
md5_parts = create_md5_parts(number_parts, offset, file_like) | ||
|
||
# Make sure POST fails if old authentication is disabled | ||
response = self.client.post( | ||
self.get_create_multipart_upload_path(), | ||
headers=headers, | ||
data={ | ||
'number_parts': number_parts, | ||
'file:checksum': checksum_multihash, | ||
'md5_parts': md5_parts | ||
}, | ||
content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
# Make sure POST abort fails if old authentication is disabled | ||
response = self.client.post( | ||
self.get_abort_multipart_upload_path('upload_id'), | ||
headers=headers, | ||
data={}, | ||
content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
# Make sure POST complete fails if old authentication is disabled | ||
response = self.client.post( | ||
self.get_complete_multipart_upload_path('upload_id'), | ||
headers=headers, | ||
data={'parts': 'parts'}, | ||
content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
def test_disabled_session_authentication(self): | ||
self.client.login(username=self.username, password=self.password) | ||
self.run_test() | ||
|
||
def test_disabled_token_authentication(self): | ||
token = Token.objects.create(user=self.user) | ||
headers = {'Authorization': f'Token {token.key}'} | ||
self.run_test(headers=headers) | ||
|
||
def test_disabled_base_authentication(self): | ||
token = b64encode(f'{self.username}:{self.password}'.encode()).decode() | ||
headers = {'Authorization': f'Basic {token}'} | ||
self.run_test(headers=headers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import logging | ||
from base64 import b64encode | ||
from datetime import datetime | ||
from datetime import timedelta | ||
from json import dumps | ||
|
@@ -7,9 +8,12 @@ | |
|
||
from django.contrib.auth import get_user_model | ||
from django.test import Client | ||
from django.test import override_settings | ||
from django.urls import reverse | ||
from django.utils import timezone | ||
|
||
from rest_framework.authtoken.models import Token | ||
|
||
from stac_api.models.item import Asset | ||
from stac_api.utils import get_asset_path | ||
from stac_api.utils import utc_aware | ||
|
@@ -917,3 +921,68 @@ def test_unauthorized_asset_post_put_patch_delete(self): | |
path = f'/{STAC_BASE_V}/collections/{collection_name}/items/{item_name}/assets/{asset_name}' | ||
response = self.client.delete(path) | ||
self.assertStatusCode(401, response, msg="Unauthorized del was permitted.") | ||
|
||
|
||
@override_settings(FEATURE_AUTH_RESTRICT_V1=True) | ||
class AssetsDisabledAuthenticationEndpointTestCase(StacBaseTestCase): | ||
|
||
@mock_s3_asset_file | ||
def setUp(self): # pylint: disable=invalid-name | ||
self.factory = Factory() | ||
self.collection = self.factory.create_collection_sample().model | ||
self.item = self.factory.create_item_sample(collection=self.collection).model | ||
self.asset = self.factory.create_asset_sample(item=self.item).model | ||
self.client = Client() | ||
self.username = 'SherlockHolmes' | ||
self.password = '221B_BakerStreet' | ||
self.user = get_user_model().objects.create_user( | ||
self.username, '[email protected]', self.password | ||
) | ||
|
||
def run_test(self, headers=None): | ||
collection_name = self.collection.name | ||
item_name = self.item.name | ||
asset_name = self.asset.name | ||
|
||
new_asset = self.factory.create_asset_sample(item=self.item).json | ||
updated_asset = self.factory.create_asset_sample( | ||
item=self.item, name=asset_name, sample='asset-1-updated' | ||
).get_json('post') | ||
|
||
# Make sure POST fails if old authentication is disabled | ||
path = f'/{STAC_BASE_V}/collections/{collection_name}/items/{item_name}/assets' | ||
response = self.client.post( | ||
path, headers=headers, data=new_asset, content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
# Make sure PUT fails if old authentication is disabled | ||
path = f'/{STAC_BASE_V}/collections/{collection_name}/items/{item_name}/assets/{asset_name}' | ||
response = self.client.put( | ||
path, headers=headers, data=updated_asset, content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized put was permitted.") | ||
|
||
# Make sure PATCH fails if old authentication is disabled | ||
response = self.client.patch( | ||
path, headers=headers, data=updated_asset, content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized patch was permitted.") | ||
|
||
# Make sure DELETE fails if old authentication is disabled | ||
response = self.client.delete(path, headers=headers) | ||
self.assertStatusCode(401, response, msg="Unauthorized del was permitted.") | ||
|
||
def test_disabled_session_authentication(self): | ||
self.client.login(username=self.username, password=self.password) | ||
self.run_test() | ||
|
||
def test_disabled_token_authentication(self): | ||
token = Token.objects.create(user=self.user) | ||
headers = {'Authorization': f'Token {token.key}'} | ||
self.run_test(headers=headers) | ||
|
||
def test_disabled_base_authentication(self): | ||
token = b64encode(f'{self.username}:{self.password}'.encode()).decode() | ||
headers = {'Authorization': f'Basic {token}'} | ||
self.run_test(headers=headers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,9 @@ | |
from django.conf import settings | ||
from django.contrib.auth import get_user_model | ||
from django.test import Client | ||
from django.test import override_settings | ||
|
||
from rest_framework.authtoken.models import Token | ||
|
||
from stac_api.models.collection import CollectionAsset | ||
from stac_api.models.collection import CollectionAssetUpload | ||
|
@@ -1286,3 +1289,72 @@ def test_asset_upload_list_parts(self): | |
self.assertS3ObjectExists(key) | ||
self.asset.refresh_from_db() | ||
self.assertEqual(size, self.asset.file_size) | ||
|
||
|
||
@override_settings(FEATURE_AUTH_RESTRICT_V1=True) | ||
class CollectionAssetUploadDisabledAuthenticationEndpointTestCase(CollectionAssetUploadBaseTest): | ||
|
||
def setUp(self): # pylint: disable=invalid-name | ||
self.client = Client() | ||
self.factory = Factory() | ||
self.collection = self.factory.create_collection_sample().model | ||
self.asset = self.factory.create_collection_asset_sample( | ||
collection=self.collection, sample='asset-no-file' | ||
).model | ||
self.maxDiff = None # pylint: disable=invalid-name | ||
self.username = 'SherlockHolmes' | ||
self.password = '221B_BakerStreet' | ||
self.user = get_user_model().objects.create_user( | ||
self.username, '[email protected]', self.password | ||
) | ||
|
||
def run_test(self, headers=None): | ||
number_parts = 2 | ||
file_like, checksum_multihash = get_file_like_object(1 * KB) | ||
offset = 1 * KB // number_parts | ||
md5_parts = create_md5_parts(number_parts, offset, file_like) | ||
|
||
# Make sure POST fails if old authentication is disabled | ||
response = self.client.post( | ||
self.get_create_multipart_upload_path(), | ||
headers=headers, | ||
data={ | ||
'number_parts': number_parts, | ||
'file:checksum': checksum_multihash, | ||
'md5_parts': md5_parts | ||
}, | ||
content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
# Make sure POST abort fails if old authentication is disabled | ||
response = self.client.post( | ||
self.get_abort_multipart_upload_path('upload_id'), | ||
headers=headers, | ||
data={}, | ||
content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
# Make sure POST complete fails if old authentication is disabled | ||
response = self.client.post( | ||
self.get_complete_multipart_upload_path('upload_id'), | ||
headers=headers, | ||
data={'parts': 'parts'}, | ||
content_type="application/json" | ||
) | ||
self.assertStatusCode(401, response, msg="Unauthorized post was permitted.") | ||
|
||
def test_disabled_session_authentication(self): | ||
self.client.login(username=self.username, password=self.password) | ||
self.run_test() | ||
|
||
def test_disabled_token_authentication(self): | ||
token = Token.objects.create(user=self.user) | ||
headers = {'Authorization': f'Token {token.key}'} | ||
self.run_test(headers=headers) | ||
|
||
def test_disabled_base_authentication(self): | ||
token = b64encode(f'{self.username}:{self.password}'.encode()).decode() | ||
headers = {'Authorization': f'Basic {token}'} | ||
self.run_test(headers=headers) |
Oops, something went wrong.