Skip to content

Commit

Permalink
Reverse syncing from services to a resource server
Browse files Browse the repository at this point in the history
In the interest of trying to prevent as many failure scenarios as
possible (though this is NOT failproof as @AlanCoding points out below
in the comments), we wrap the REST call and the local commit in a
transaction (unless a transaction is already present).

For modules which sync (users, orgs, teams), their `save()` method is
monkeypatched to wrap the whole save in a transaction, if necessary.

A pre_delete and post_save signal is used for doing the actual sync.

In order to prevent sync loops, we sneak an internal attribute on
instances that we create/fetch on requests from a resource server. In
the syncing function, we look for this attribute and abort syncing if
it's there and `True`.

Unit tests (somewhat gnarly ones) are provided. They attempt to ensure
that we're calling the right REST client methods when we need to,
bailing out early when we need to, rolling back when we need to, and
committing when we need to. Coverage should be pretty darn close to
100%.

A new setting, `DISABLE_RESOURCE_SERVER_SYNC` works to disable the
reverse syncing. We use it in `test_app` to avoid syncing on all but
the specific reverse syncing tests (the ones written in this PR).

Signed-off-by: Rick Elrod <[email protected]>
  • Loading branch information
relrod committed Aug 9, 2024
1 parent c29dee1 commit 9462356
Show file tree
Hide file tree
Showing 15 changed files with 601 additions and 10 deletions.
10 changes: 6 additions & 4 deletions ansible_base/jwt_consumer/common/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ansible_base.lib.utils.auth import get_user_by_ansible_id
from ansible_base.lib.utils.translations import translatableConditionally as _
from ansible_base.resource_registry.models import Resource, ResourceType
from ansible_base.resource_registry.signals.handlers import no_reverse_sync

logger = logging.getLogger("ansible_base.jwt_consumer.common.auth")

Expand Down Expand Up @@ -120,10 +121,11 @@ def parse_jwt_token(self, request):
logger.warn(f"New user {self.user.username} created from JWT auth")
except IntegrityError as exc:
logger.warning(f'Existing user {self.token["user_data"]} is a conflict with local user, error: {exc}')
self.user, created = get_user_model().objects.update_or_create(
username=self.token["user_data"]['username'],
defaults=user_defaults,
)
with no_reverse_sync():
self.user, created = get_user_model().objects.update_or_create(
username=self.token["user_data"]['username'],
defaults=user_defaults,
)

setattr(self.user, "resource_api_actions", self.token.get("resource_api_actions", None))

Expand Down
14 changes: 14 additions & 0 deletions ansible_base/lib/utils/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from contextlib import contextmanager

from django.db import transaction


@contextmanager
def ensure_transaction():
needs_new_transaction = not transaction.get_connection().in_atomic_block

if needs_new_transaction:
with transaction.atomic():
yield
else:
yield
51 changes: 50 additions & 1 deletion ansible_base/resource_registry/apps.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging

from django.apps import AppConfig
from django.conf import settings
from django.db.models import TextField, signals
from django.db.models.functions import Cast
from django.db.utils import IntegrityError

import ansible_base.lib.checks # noqa: F401 - register checks
from ansible_base.lib.utils.db import ensure_transaction

logger = logging.getLogger('ansible_base.resource_registry.apps')

Expand Down Expand Up @@ -93,6 +95,17 @@ def proxies_of_model(cls):
yield sub_cls


def _should_reverse_sync():
enabled = not getattr(settings, 'DISABLE_RESOURCE_SERVER_SYNC', False)
for setting in ('RESOURCE_SERVER', 'RESOURCE_SERVICE_PATH'):
if not getattr(settings, setting, False):
enabled = False
break
if hasattr(settings, 'RESOURCE_SERVER') and ('SECRET_KEY' not in settings.RESOURCE_SERVER or not settings.RESOURCE_SERVER['SECRET_KEY']):
enabled = False
return enabled


def connect_resource_signals(sender, **kwargs):
from ansible_base.resource_registry.signals import handlers

Expand All @@ -103,6 +116,31 @@ def connect_resource_signals(sender, **kwargs):
signals.post_save.connect(handlers.update_resource, sender=cls)
signals.post_delete.connect(handlers.remove_resource, sender=cls)

if _should_reverse_sync():
signals.pre_save.connect(handlers.decide_to_sync_update, sender=cls)
signals.post_save.connect(handlers.sync_to_resource_server_post_save, sender=cls)
signals.pre_delete.connect(handlers.sync_to_resource_server_pre_delete, sender=cls)

# Wrap save() in a transaction and sync to resource server
cls._original_save = cls.save

# Avoid late binding issues
def save(self, *args, _original_save=cls._original_save, **kwargs):
with ensure_transaction():
_original_save(self, *args, **kwargs)

cls.save = save

# Wrap delete() in a transaction and remove from resource server
cls._original_delete = cls.delete

# Avoid late binding issues
def delete(self, *args, _original_delete=cls._original_delete, **kwargs):
with ensure_transaction():
_original_delete(self, *args, **kwargs)

cls.delete = delete


def disconnect_resource_signals(sender, **kwargs):
from ansible_base.resource_registry.signals import handlers
Expand All @@ -112,6 +150,18 @@ def disconnect_resource_signals(sender, **kwargs):
signals.post_save.disconnect(handlers.update_resource, sender=cls)
signals.post_delete.disconnect(handlers.remove_resource, sender=cls)

signals.pre_save.disconnect(handlers.decide_to_sync_update, sender=cls)
signals.post_save.disconnect(handlers.sync_to_resource_server_post_save, sender=cls)
signals.pre_delete.disconnect(handlers.sync_to_resource_server_pre_delete, sender=cls)

if hasattr(cls, '_original_save'):
cls.save = cls._original_save
del cls._original_save

if hasattr(cls, '_original_delete'):
cls.delete = cls._original_delete
del cls._original_delete


class ResourceRegistryConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
Expand All @@ -123,5 +173,4 @@ def ready(self):
connect_resource_signals(sender=None)
signals.pre_migrate.connect(disconnect_resource_signals, sender=self)
signals.post_migrate.connect(initialize_resources, sender=self)
# We need to re-connect signals for tests, because migrations are executed in the same process.
signals.post_migrate.connect(connect_resource_signals, sender=self)
19 changes: 15 additions & 4 deletions ansible_base/resource_registry/models/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,22 @@ def get_resource_for_object(cls, obj):
return cls.objects.get(object_id=obj.pk, content_type=ContentType.objects.get_for_model(obj).pk)

def delete_resource(self):
from ..signals.handlers import no_reverse_sync

if not self.content_type.resource_type.can_be_managed:
raise ValidationError({"resource_type": _(f"Resource type: {self.content_type.resource_type.name} cannot be managed by Resources.")})

with transaction.atomic():
self.content_object.delete()
with no_reverse_sync():
self.content_object.delete()
self.delete()

@classmethod
def create_resource(
cls, resource_type: ResourceType, resource_data: dict, ansible_id: Union[str, uuid.UUID, None] = None, service_id: Union[str, uuid.UUID, None] = None
):
from ..signals.handlers import no_reverse_sync

c_type = resource_type.content_type
serializer = resource_type.serializer_class(data=resource_data)
serializer.is_valid(raise_exception=True)
Expand All @@ -107,8 +112,10 @@ def create_resource(

with transaction.atomic():
ObjModel = c_type.model_class()
content_object = processor(ObjModel()).save(resource_data, is_new=True)
resource = cls.objects.get(object_id=content_object.pk, content_type=c_type)
content_object = processor(ObjModel())
with no_reverse_sync():
content_object.save(resource_data, is_new=True)
resource = cls.objects.get(object_id=content_object.instance.pk, content_type=c_type)

if ansible_id:
resource.ansible_id = ansible_id
Expand All @@ -119,6 +126,8 @@ def create_resource(
return resource

def update_resource(self, resource_data: dict, ansible_id=None, partial=False, service_id: Union[str, uuid.UUID, None] = None):
from ..signals.handlers import no_reverse_sync

resource_type = self.content_type.resource_type

serializer = resource_type.serializer_class(data=resource_data, partial=partial)
Expand All @@ -134,7 +143,9 @@ def update_resource(self, resource_data: dict, ansible_id=None, partial=False, s
self.service_id = service_id
self.save()

processor(self.content_object).save(resource_data)
content_object = processor(self.content_object)
with no_reverse_sync():
content_object.save(resource_data)


# This is a separate function so that it can work with models from apps in the
Expand Down
2 changes: 2 additions & 0 deletions ansible_base/resource_registry/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def _make_request(
kwargs["stream"] = stream

resp = requests.request(**kwargs)
logger.info(f"Response status code from {url}: {resp.status_code}")

if self.raise_if_bad_request:
resp.raise_for_status()
return resp
Expand Down
81 changes: 81 additions & 0 deletions ansible_base/resource_registry/signals/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import threading
from contextlib import contextmanager
from functools import lru_cache

from ansible_base.resource_registry.models import Resource, init_resource_from_object
from ansible_base.resource_registry.registry import get_registry
from ansible_base.resource_registry.utils.sync_to_resource_server import sync_to_resource_server


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -30,3 +33,81 @@ def update_resource(sender, instance, created, **kwargs):
except Resource.DoesNotExist:
resource = init_resource_from_object(instance)
resource.save()


# pre_save
def decide_to_sync_update(sender, instance, raw, using, update_fields, **kwargs):
"""
A pre_save hook that decides whether or not to reverse-sync the instance
based on which fields have changed.
This has to be in pre-save because we have to be able to get the original
instance to calculate which fields changed, if update_fields wasn't passed
"""

if instance._state.adding:
# We only concern ourselves with updates
return

try:
if not getattr(instance, 'resource', None) or not instance.resource.ansible_id:
# We can't sync here, but we want to log that, so let sync_to_resource_server() discard it.
return
except Resource.DoesNotExist:
# The getattr() will raise a Resource.DoesNotExist if the resource doesn't exist.
return

fields_that_sync = instance.resource.content_type.resource_type.serializer_class().get_fields().keys()

if update_fields is None:
# If we're not given a useful update_fields, manually calculate the changed fields
# at the cost of an extra query
existing_instance = sender.objects.get(pk=instance.pk)
changed_fields = set()
for field in fields_that_sync:
if getattr(existing_instance, field) != getattr(instance, field):
changed_fields.add(field)
else:
# If we're given update_fields, we can just check those
changed_fields = set(update_fields)

if not changed_fields.intersection(fields_that_sync):
instance._skip_reverse_resource_sync = True


class ReverseSyncEnabled(threading.local):
def __init__(self):
self.enabled = True

def __bool__(self):
return self.enabled


reverse_sync_enabled = ReverseSyncEnabled()


@contextmanager
def no_reverse_sync():
previous_value = reverse_sync_enabled.enabled
reverse_sync_enabled.enabled = False
try:
yield
finally:
reverse_sync_enabled.enabled = previous_value


# post_save
def sync_to_resource_server_post_save(sender, instance, created, update_fields, **kwargs):
if not reverse_sync_enabled:
return

action = "create" if created else "update"
sync_to_resource_server(instance, action)


# pre_delete
def sync_to_resource_server_pre_delete(sender, instance, **kwargs):
if not reverse_sync_enabled:
return

sync_to_resource_server(instance, "delete", ansible_id=instance.resource.ansible_id)
95 changes: 95 additions & 0 deletions ansible_base/resource_registry/utils/sync_to_resource_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging

from crum import get_current_user
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError

from ansible_base.resource_registry.models import Resource
from ansible_base.resource_registry.rest_client import ResourceRequestBody, get_resource_server_client

logger = logging.getLogger('ansible_base.resource_registry.utils.sync_to_resource_server')


def sync_to_resource_server(instance, action, ansible_id=None):
"""
Use the resource server API to sync the resource across.
When action is "delete", the ansible_id is required, because by the time we
get here, we've already deleted the object and its resource. So we can't
pull the ansible_id from the resource object. It's on the caller to pull
the ansible_id from the object before deleting it.
For all other actions, ansible_id is ignored and retrieved from the resource
object. (For create, the resource is expected to exist before calling this
function.)
"""

# This gets set in Resource.create_resource() and friends (and jwt_consumer.common.auth...)
# Also from a pre_save hook that checks to see if the object has changed a synced field or not, for updates.
skip_sync = getattr(instance, '_skip_reverse_resource_sync', False)
if skip_sync:
# Avoid an infinite loop by not syncing resources that came from the resource server.
# Or avoid syncing unnecessarily, when a synced field hasn't changed.
logger.info(f"Skipping sync of resource {instance}")
return

try:
if action != "delete" and ansible_id is not None:
raise Exception("ansible_id should not be provided for create/update actions")
elif action == "delete" and ansible_id is None:
raise Exception("ansible_id should be provided for delete actions")
elif not getattr(instance, 'resource', None) or not instance.resource.ansible_id:
# We can't sync if we don't have a resource and an ansible_id.
logger.error(f"Resource {instance} does not have a resource or ansible_id")
return
except Resource.DoesNotExist:
# The getattr() will raise a Resource.DoesNotExist if the resource doesn't exist.
logger.error(f"Resource {instance} does not have a resource")
return

user_ansible_id = None
user = get_current_user()
if user:
# If we have a user, try to get their ansible_id and sync as them.
# If they don't have one some how, or if we don't have a user, sync with None and
# let the resource server decide what to do.
try:
user_ansible_id = user.resource.ansible_id
except AttributeError:
logger.error(f"User {user} does not have a resource")
pass
else:
logger.error("No user found, syncing to resource server with jwt_user_id=None")

client = get_resource_server_client(
settings.RESOURCE_SERVICE_PATH,
jwt_user_id=user_ansible_id,
raise_if_bad_request=True,
)

if action != "delete":
ansible_id = instance.resource.ansible_id

resource_type = instance.resource.content_type.resource_type
data = resource_type.serializer_class(instance).data
body = ResourceRequestBody(
resource_type=resource_type.name,
ansible_id=ansible_id,
resource_data=data,
)

try:
if action == "create":
response = client.create_resource(body)
json = response.json()
if isinstance(json, dict): # Mainly for tests... to avoid getting here with mock
instance.resource.service_id = json['service_id']
instance.resource.save()
elif action == "update":
client.update_resource(ansible_id, body)
elif action == "delete":
client.delete_resource(ansible_id)
except Exception as e:
logger.exception(f"Failed to sync {action} of resource {instance} ({ansible_id}) to resource server: {e}")
raise ValidationError(_("Failed to sync resource to resource server")) from e
Loading

0 comments on commit 9462356

Please sign in to comment.