Skip to content

Commit

Permalink
Reverse syncing from services to a resource server
Browse files Browse the repository at this point in the history
In the interest of trying to prevent as many failure scenarios as
possible (though this is NOT failproof as @AlanCoding points out below
in the comments), we wrap the REST call and the local commit in a
transaction (unless a transaction is already present).

For modules which sync (users, orgs, teams), their `save()` method is
monkeypatched to wrap the whole save in a transaction, if necessary.

A pre_delete and post_save signal is used for doing the actual sync.

In order to prevent sync loops, we sneak an internal attribute on
instances that we create/fetch on requests from a resource server. In
the syncing function, we look for this attribute and abort syncing if
it's there and `True`.

Unit tests (somewhat gnarly ones) are provided. They attempt to ensure
that we're calling the right REST client methods when we need to,
bailing out early when we need to, rolling back when we need to, and
committing when we need to. Coverage should be pretty darn close to
100%.

A new setting, `DISABLE_RESOURCE_SERVER_SYNC` works to disable the
reverse syncing. We use it in `test_app` to avoid syncing on all but
the specific reverse syncing tests (the ones written in this PR).

Signed-off-by: Rick Elrod <[email protected]>
  • Loading branch information
relrod committed Aug 9, 2024
1 parent c29dee1 commit cc8e3b4
Show file tree
Hide file tree
Showing 15 changed files with 555 additions and 8 deletions.
14 changes: 10 additions & 4 deletions ansible_base/jwt_consumer/common/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models import Model
from django.db.utils import IntegrityError
from rest_framework.authentication import BaseAuthentication
Expand Down Expand Up @@ -120,10 +121,15 @@ def parse_jwt_token(self, request):
logger.warn(f"New user {self.user.username} created from JWT auth")
except IntegrityError as exc:
logger.warning(f'Existing user {self.token["user_data"]} is a conflict with local user, error: {exc}')
self.user, created = get_user_model().objects.update_or_create(
username=self.token["user_data"]['username'],
defaults=user_defaults,
)

with transaction.atomic():
try:
self.user = get_user_model().objects.select_for_update().get(username=self.token["user_data"]['username'])
except get_user_model().DoesNotExist:
self.user = get_user_model()(**user_defaults)

self.user._skip_reverse_resource_sync = True
self.user.save()

setattr(self.user, "resource_api_actions", self.token.get("resource_api_actions", None))

Expand Down
14 changes: 14 additions & 0 deletions ansible_base/lib/utils/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from contextlib import contextmanager

from django.db import transaction


@contextmanager
def ensure_transaction():
needs_new_transaction = not transaction.get_connection().in_atomic_block

if needs_new_transaction:
with transaction.atomic():
yield
else:
yield
50 changes: 50 additions & 0 deletions ansible_base/resource_registry/apps.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging

from django.apps import AppConfig
from django.conf import settings
from django.db.models import TextField, signals
from django.db.models.functions import Cast
from django.db.utils import IntegrityError

import ansible_base.lib.checks # noqa: F401 - register checks
from ansible_base.lib.utils.db import ensure_transaction

logger = logging.getLogger('ansible_base.resource_registry.apps')

Expand Down Expand Up @@ -93,6 +95,17 @@ def proxies_of_model(cls):
yield sub_cls


def _should_reverse_sync():
enabled = not getattr(settings, 'DISABLE_RESOURCE_SERVER_SYNC', False)
for setting in ('RESOURCE_SERVER', 'RESOURCE_SERVICE_PATH'):
if not getattr(settings, setting, False):
enabled = False
break
if hasattr(settings, 'RESOURCE_SERVER') and ('SECRET_KEY' not in settings.RESOURCE_SERVER or not settings.RESOURCE_SERVER['SECRET_KEY']):
enabled = False
return enabled


def connect_resource_signals(sender, **kwargs):
from ansible_base.resource_registry.signals import handlers

Expand All @@ -103,6 +116,31 @@ def connect_resource_signals(sender, **kwargs):
signals.post_save.connect(handlers.update_resource, sender=cls)
signals.post_delete.connect(handlers.remove_resource, sender=cls)

if _should_reverse_sync():
signals.pre_save.connect(handlers.decide_to_sync_update, sender=cls)
signals.post_save.connect(handlers.sync_to_resource_server_post_save, sender=cls)
signals.pre_delete.connect(handlers.sync_to_resource_server_pre_delete, sender=cls)

# Wrap save() in a transaction and sync to resource server
cls._original_save = cls.save

# Avoid late binding issues
def save(self, *args, _original_save=cls._original_save, **kwargs):
with ensure_transaction():
_original_save(self, *args, **kwargs)

cls.save = save

# Wrap delete() in a transaction and remove from resource server
cls._original_delete = cls.delete

# Avoid late binding issues
def delete(self, *args, _original_delete=cls._original_delete, **kwargs):
with ensure_transaction():
_original_delete(self, *args, **kwargs)

cls.delete = delete


def disconnect_resource_signals(sender, **kwargs):
from ansible_base.resource_registry.signals import handlers
Expand All @@ -112,6 +150,18 @@ def disconnect_resource_signals(sender, **kwargs):
signals.post_save.disconnect(handlers.update_resource, sender=cls)
signals.post_delete.disconnect(handlers.remove_resource, sender=cls)

signals.pre_save.connect(handlers.decide_to_sync_update, sender=cls)
signals.post_save.disconnect(handlers.sync_to_resource_server_post_save, sender=cls)
signals.pre_delete.disconnect(handlers.sync_to_resource_server_pre_delete, sender=cls)

if hasattr(cls, '_original_save'):
cls.save = cls._original_save
del cls._original_save

if hasattr(cls, '_original_delete'):
cls.delete = cls._original_delete
del cls._original_delete


class ResourceRegistryConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
Expand Down
13 changes: 10 additions & 3 deletions ansible_base/resource_registry/models/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def delete_resource(self):
raise ValidationError({"resource_type": _(f"Resource type: {self.content_type.resource_type.name} cannot be managed by Resources.")})

with transaction.atomic():
self.content_object._skip_reverse_resource_sync = True # Don't try to sync it back to the server
self.content_object.delete()
self.delete()

Expand All @@ -107,8 +108,11 @@ def create_resource(

with transaction.atomic():
ObjModel = c_type.model_class()
content_object = processor(ObjModel()).save(resource_data, is_new=True)
resource = cls.objects.get(object_id=content_object.pk, content_type=c_type)
content_object = processor(ObjModel())
content_object.instance._skip_reverse_resource_sync = True # Don't try to sync it back to the server
content_object.save(resource_data, is_new=True)
del content_object.instance._skip_reverse_resource_sync
resource = cls.objects.get(object_id=content_object.instance.pk, content_type=c_type)

if ansible_id:
resource.ansible_id = ansible_id
Expand All @@ -134,7 +138,10 @@ def update_resource(self, resource_data: dict, ansible_id=None, partial=False, s
self.service_id = service_id
self.save()

processor(self.content_object).save(resource_data)
content_object = processor(self.content_object)
content_object.instance._skip_reverse_resource_sync = True # Don't try to sync it back to the server
content_object.save(resource_data)
del content_object.instance._skip_reverse_resource_sync


# This is a separate function so that it can work with models from apps in the
Expand Down
2 changes: 2 additions & 0 deletions ansible_base/resource_registry/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def _make_request(
kwargs["stream"] = stream

resp = requests.request(**kwargs)
logger.info(f"Response status code from {url}: {resp.status_code}")

if self.raise_if_bad_request:
resp.raise_for_status()
return resp
Expand Down
52 changes: 52 additions & 0 deletions ansible_base/resource_registry/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ansible_base.resource_registry.models import Resource, init_resource_from_object
from ansible_base.resource_registry.registry import get_registry
from ansible_base.resource_registry.utils.sync_to_resource_server import sync_to_resource_server


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -30,3 +31,54 @@ def update_resource(sender, instance, created, **kwargs):
except Resource.DoesNotExist:
resource = init_resource_from_object(instance)
resource.save()


# pre_save
def decide_to_sync_update(sender, instance, raw, using, update_fields, **kwargs):
"""
A pre_save hook that decides whether or not to reverse-sync the instance
based on which fields have changed.
This has to be in pre-save because we have to be able to get the original
instance to calculate which fields changed, if update_fields wasn't passed
"""

if instance._state.adding:
# We only concern ourselves with updates
return

try:
if not getattr(instance, 'resource', None) or not instance.resource.ansible_id:
# We can't sync here, but we want to log that, so let sync_to_resource_server() discard it.
return
except Resource.DoesNotExist:
# The getattr() will raise a Resource.DoesNotExist if the resource doesn't exist.
return

fields_that_sync = instance.resource.content_type.resource_type.serializer_class().get_fields().keys()

if update_fields is None:
# If we're not given a useful update_fields, manually calculate the changed fields
# at the cost of an extra query
existing_instance = sender.objects.get(pk=instance.pk)
changed_fields = set()
for field in fields_that_sync:
if getattr(existing_instance, field) != getattr(instance, field):
changed_fields.add(field)
else:
# If we're given update_fields, we can just check those
changed_fields = set(update_fields)

if not changed_fields.intersection(fields_that_sync):
instance._skip_reverse_resource_sync = True


# post_save
def sync_to_resource_server_post_save(sender, instance, created, update_fields, **kwargs):
action = "create" if created else "update"
sync_to_resource_server(instance, action)


# pre_delete
def sync_to_resource_server_pre_delete(sender, instance, **kwargs):
sync_to_resource_server(instance, "delete", ansible_id=instance.resource.ansible_id)
95 changes: 95 additions & 0 deletions ansible_base/resource_registry/utils/sync_to_resource_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging

from crum import get_current_user
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError

from ansible_base.resource_registry.models import Resource
from ansible_base.resource_registry.rest_client import ResourceRequestBody, get_resource_server_client

logger = logging.getLogger('ansible_base.resource_registry.utils.sync_to_resource_server')


def sync_to_resource_server(instance, action, ansible_id=None):
"""
Use the resource server API to sync the resource across.
When action is "delete", the ansible_id is required, because by the time we
get here, we've already deleted the object and its resource. So we can't
pull the ansible_id from the resource object. It's on the caller to pull
the ansible_id from the object before deleting it.
For all other actions, ansible_id is ignored and retrieved from the resource
object. (For create, the resource is expected to exist before calling this
function.)
"""

# This gets set in Resource.create_resource() and friends (and jwt_consumer.common.auth...)
# Also from a pre_save hook that checks to see if the object has changed a synced field or not, for updates.
skip_sync = getattr(instance, '_skip_reverse_resource_sync', False)
if skip_sync:
# Avoid an infinite loop by not syncing resources that came from the resource server.
# Or avoid syncing unnecessarily, when a synced field hasn't changed.
logger.info(f"Skipping sync of resource {instance}")
return

try:
if action != "delete" and ansible_id is not None:
raise Exception("ansible_id should not be provided for create/update actions")
elif action == "delete" and ansible_id is None:
raise Exception("ansible_id should be provided for delete actions")
elif not getattr(instance, 'resource', None) or not instance.resource.ansible_id:
# We can't sync if we don't have a resource and an ansible_id.
logger.error(f"Resource {instance} does not have a resource or ansible_id")
return
except Resource.DoesNotExist:
# The getattr() will raise a Resource.DoesNotExist if the resource doesn't exist.
logger.error(f"Resource {instance} does not have a resource")
return

user_ansible_id = None
user = get_current_user()
if user:
# If we have a user, try to get their ansible_id and sync as them.
# If they don't have one some how, or if we don't have a user, sync with None and
# let the resource server decide what to do.
try:
user_ansible_id = user.resource.ansible_id
except AttributeError:
logger.error(f"User {user} does not have a resource")
pass
else:
logger.error("No user found, syncing to resource server with jwt_user_id=None")

client = get_resource_server_client(
settings.RESOURCE_SERVICE_PATH,
jwt_user_id=user_ansible_id,
raise_if_bad_request=True,
)

if action != "delete":
ansible_id = instance.resource.ansible_id

resource_type = instance.resource.content_type.resource_type
data = resource_type.serializer_class(instance).data
body = ResourceRequestBody(
resource_type=resource_type.name,
ansible_id=ansible_id,
resource_data=data,
)

try:
if action == "create":
response = client.create_resource(body)
json = response.json()
if isinstance(json, dict): # Mainly for tests... to avoid getting here with mock
instance.resource.service_id = json['service_id']
instance.resource.save()
elif action == "update":
client.update_resource(ansible_id, body)
elif action == "delete":
client.delete_resource(ansible_id)
except Exception as e:
logger.exception(f"Failed to sync {action} of resource {instance} ({ansible_id}) to resource server: {e}")
raise ValidationError(_("Failed to sync resource to resource server")) from e
39 changes: 39 additions & 0 deletions docs/apps/resource_registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ RESOURCE_SERVER = {
# Optional
RESOURCE_JWT_USER_ID = "97447387-8596-404f-b0d0-6429b04c8d22"
RESOURCE_SERVICE_PATH = "/api/server/v1/service-index/"

# Optional, mainly for tests
DISABLE_RESOURCE_SERVER_SYNC = False
```

> NOTE: Secret key must be generated on the resource server, e.g `generate_service_secret` management command.
Expand Down Expand Up @@ -561,3 +564,39 @@ Objects and types:
- `ManifestItem` - Serializer for resource manifest CSV containing ansible_id and resource_hash
- `get_resource_type_names` - List str of `shared.{organization,team,user}`
- `ManifestNotFound` - Custom exception for when manifest is not served

### Reverse-syncing (on-the-fly syncing of local service changes into the resource server)

The above "Configuration" section where `RESOURCE_SERVER` and some other
variables are defined, is also how reverse-syncing is configured.

Reverse-syncing means syncing resources from the local service into the resource
service, in realtime (on the fly) as changes are made.

The way this works is by monkeypatching `save()` on models that are shared
resources (users, orgs, teams). This patching is done in
`ansible_base.resource_registry.apps`. A reference to the original method is
also saved so that the monkeypatch can be undone at runtime if needed by calling
`apps.disconnect_resource_signals()`. At startup time,
`apps.connect_resource_signals()` is called and that applies the patch along
with connecting the normal signals described above in earlier sections.

At save-time, we ensure we are in a transaction (creating one if necessary). The
original `save()` method is also called first, because that allows for the
`Resource` to get created and for the object to have an `ansible_id` which is
important for syncing. Then
`ansible_base.resource_server.utils.sync_to_resource_server.sync_to_resource_server()`
is called.

At delete time, similar happens - the difference being, we extract the
`ansible_id`, since it will no longer exist once the object is deleted (which
also deletes its `Resource`), and we need it so that the resource server knows
which resource to delete.

The purpose of using a transaction here is so that we can rollback if we are
unable to sync to the resource server, and so that we don't sync to the resource
server if an error happens when trying to save to the local service's
database. However, it's important to note that this is not fail-proof and there
are cases where we can still sync something to the resource server and end up
not being able to commit it locally. In this event, the next periodic sync
should sync the object back down to the service.
18 changes: 18 additions & 0 deletions test_app/migrations/0015_organization_extra_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.11 on 2024-08-08 01:19

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('test_app', '0014_autoextrauuidmodel_manualextrauuidmodel_and_more'),
]

operations = [
migrations.AddField(
model_name='organization',
name='extra_field',
field=models.CharField(max_length=100, null=True),
),
]
Loading

0 comments on commit cc8e3b4

Please sign in to comment.