From a1b74ecd52856acdec68a3741d0701afc54c73c0 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 9 Jan 2025 16:19:49 +0000 Subject: [PATCH 1/3] Remove viewer access in campaigns app --- temba/api/v2/tests/test_campaign_events.py | 6 +- temba/api/v2/tests/test_campaigns.py | 4 +- temba/campaigns/tests/test_campaign.py | 498 -------------------- temba/campaigns/tests/test_campaigncrudl.py | 108 +++-- temba/campaigns/tests/test_eventcrudl.py | 4 +- temba/flows/tests/test_flowcrudl.py | 30 +- temba/settings_common.py | 5 - 7 files changed, 95 insertions(+), 560 deletions(-) diff --git a/temba/api/v2/tests/test_campaign_events.py b/temba/api/v2/tests/test_campaign_events.py index 003891fcbe7..c6db3e7d985 100644 --- a/temba/api/v2/tests/test_campaign_events.py +++ b/temba/api/v2/tests/test_campaign_events.py @@ -15,8 +15,8 @@ def test_endpoint(self, mr_mocks): endpoint_url = reverse("api.v2.campaign_events") + ".json" self.assertGetNotPermitted(endpoint_url, [None, self.agent]) - self.assertPostNotPermitted(endpoint_url, [None, self.user, self.agent]) - self.assertDeleteNotPermitted(endpoint_url, [None, self.user, self.agent]) + self.assertPostNotPermitted(endpoint_url, [None, self.agent]) + self.assertDeleteNotPermitted(endpoint_url, [None, self.agent]) joe = self.create_contact("Joe Blow", phone="+250788123123") frank = self.create_contact("Frank", urns=["facebook:123456"]) @@ -63,7 +63,7 @@ def test_endpoint(self, mr_mocks): # no filtering self.assertGet( endpoint_url, - [self.user, self.editor, self.admin], + [self.editor, self.admin], results=[ { "uuid": str(event3.uuid), diff --git a/temba/api/v2/tests/test_campaigns.py b/temba/api/v2/tests/test_campaigns.py index 6d1ee41ea6a..5d31d35433b 100644 --- a/temba/api/v2/tests/test_campaigns.py +++ b/temba/api/v2/tests/test_campaigns.py @@ -12,7 +12,7 @@ def test_endpoint(self): endpoint_url = reverse("api.v2.campaigns") + ".json" self.assertGetNotPermitted(endpoint_url, [None, self.agent]) - self.assertPostNotPermitted(endpoint_url, [None, self.user, self.agent]) + self.assertPostNotPermitted(endpoint_url, [None, self.agent]) self.assertDeleteNotAllowed(endpoint_url) joe = self.create_contact("Joe Blow", phone="+250788123123") @@ -29,7 +29,7 @@ def test_endpoint(self): # no filtering response = self.assertGet( endpoint_url, - [self.user, self.editor, self.admin], + [self.editor, self.admin], results=[ { "uuid": str(campaign2.uuid), diff --git a/temba/campaigns/tests/test_campaign.py b/temba/campaigns/tests/test_campaign.py index 098ccc463f7..711227eafbf 100644 --- a/temba/campaigns/tests/test_campaign.py +++ b/temba/campaigns/tests/test_campaign.py @@ -5,14 +5,12 @@ from django.conf import settings from django.core.exceptions import ValidationError from django.core.files.storage import default_storage -from django.urls import reverse from django.utils import timezone from temba.campaigns.models import Campaign, CampaignEvent, EventFire from temba.campaigns.tasks import trim_event_fires from temba.contacts.models import ContactField from temba.flows.models import Flow -from temba.msgs.models import Msg from temba.orgs.models import DefinitionExport, Org from temba.tests import TembaTest, matchers, mock_mailroom @@ -254,502 +252,6 @@ def test_trim_event_fires(self): e = EventFire.objects.get() self.assertEqual(e.id, e2.id) - @mock_mailroom - def test_views(self, mr_mocks): - open_tickets = self.org.groups.get(name="Open Tickets") - - current_year = timezone.now().year - - # update the planting date for our contacts - self.set_contact_field(self.farmer1, "planting_date", f"1/10/{current_year-2}") - - # don't log in, try to create a new campaign - response = self.client.get(reverse("campaigns.campaign_create")) - self.assertLoginRedirect(response) - - # ok log in as an org - self.login(self.admin) - - # go to to the creation page - response = self.client.get(reverse("campaigns.campaign_create")) - self.assertEqual(200, response.status_code) - - # groups shouldn't include the group that isn't ready - self.assertEqual({open_tickets, self.farmers}, set(response.context["form"].fields["group"].queryset)) - - response = self.client.post( - reverse("campaigns.campaign_create"), {"name": "Planting Reminders", "group": self.farmers.id} - ) - - # should redirect to read page for this campaign - campaign = Campaign.objects.filter(is_active=True).first() - self.assertRedirect(response, reverse("campaigns.campaign_read", args=[campaign.uuid])) - - # go to the list page, should be there as well - response = self.client.get(reverse("campaigns.campaign_list")) - self.assertContains(response, "Planting Reminders") - - # try searching for the campaign by group name - response = self.client.get(reverse("campaigns.campaign_list") + "?search=farmers") - self.assertContains(response, "Planting Reminders") - - # test no match - response = self.client.get(reverse("campaigns.campaign_list") + "?search=factory") - self.assertNotContains(response, "Planting Reminders") - - # archive a campaign - self.client.post(reverse("campaigns.campaign_list"), {"action": "archive", "objects": campaign.id}) - response = self.client.get(reverse("campaigns.campaign_list")) - self.assertNotContains(response, "Planting Reminders") - - # restore the campaign - response = self.client.get(reverse("campaigns.campaign_archived")) - self.assertContains(response, "Planting Reminders") - self.client.post(reverse("campaigns.campaign_archived"), {"action": "restore", "objects": campaign.id}) - response = self.client.get(reverse("campaigns.campaign_archived")) - self.assertNotContains(response, "Planting Reminders") - response = self.client.get(reverse("campaigns.campaign_list")) - self.assertContains(response, "Planting Reminders") - - # test viewers cannot use action archive or restore - self.client.logout() - - # login as a viewer - self.login(self.user) - - # go to the list page, should be there as well - response = self.client.get(reverse("campaigns.campaign_list")) - self.assertContains(response, "Planting Reminders") - - # cannot archive a campaign - post_data = dict(action="archive", objects=campaign.pk) - self.client.post(reverse("campaigns.campaign_list"), post_data) - response = self.client.get(reverse("campaigns.campaign_list")) - self.assertContains(response, "Planting Reminders") - response = self.client.get(reverse("campaigns.campaign_archived")) - self.assertNotContains(response, "Planting Reminders") - - self.client.logout() - self.login(self.admin) - - # see if we can create a new event, should see both sms and voice flows - response = self.client.get(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk) - self.assertEqual(200, response.status_code) - self.assertContains(response, self.reminder_flow.name) - self.assertContains(response, self.voice_flow.name) - - # 'Created On' system field must be selectable in the form - contact_fields = [field.key for field in response.context["form"].fields["relative_to"].queryset] - self.assertEqual(contact_fields, ["created_on", "last_seen_on", "planting_date"]) - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=-1, - base="", - direction="A", - offset=2, - unit="D", - event_type="M", - flow_to_start=self.reminder_flow.pk, - flow_start_mode="I", - ) - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - self.assertTrue(response.context["form"].errors) - self.assertIn("A message is required", str(response.context["form"].errors["__all__"])) - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=-1, - eng="allo!" * 500, - direction="A", - offset=2, - unit="D", - event_type="M", - flow_to_start=self.reminder_flow.pk, - flow_start_mode="I", - ) - - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - self.assertFormError( - response.context["form"], None, f"Translation for 'English' exceeds the {Msg.MAX_TEXT_LEN} character limit." - ) - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=-1, - base="", - direction="A", - offset=2, - unit="D", - event_type="F", - flow_start_mode="I", - ) - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - self.assertFormError(response.context["form"], "flow_to_start", "This field is required.") - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=-1, - base="", - direction="A", - offset=2, - unit="D", - event_type="F", - flow_to_start=self.reminder_flow.pk, - flow_start_mode="I", - ) - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - event = CampaignEvent.objects.filter(is_active=True).get() - # should be redirected back to our campaign read page - self.assertRedirect(response, reverse("campaigns.campaign_read", args=[campaign.uuid])) - - self.assertEqual(self.reminder_flow, event.flow) - self.assertEqual(self.planting_date, event.relative_to) - self.assertEqual(2, event.offset) - self.assertEqual("I", event.start_mode) - - # read the campaign read page - response = self.client.get(reverse("campaigns.campaign_read", args=[campaign.uuid])) - self.assertContains(response, "Reminder Flow") - self.assertContains(response, "1") - - # should have queued a scheduling task to mailroom - self.assertEqual( - [ - { - "org_id": self.org.id, - "type": "schedule_campaign_event", - "queued_on": matchers.Datetime(), - "task": {"campaign_event_id": event.id, "org_id": self.org.id}, - } - ], - mr_mocks.queued_batch_tasks, - ) - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=15, - base="", - direction="A", - offset=1, - unit="D", - event_type="F", - flow_to_start=self.reminder_flow.pk, - flow_start_mode="I", - ) - response = self.client.post(reverse("campaigns.campaignevent_update", args=[event.pk]), post_data) - - # should have queued another scheduling task to mailroom - self.assertEqual(2, len(mr_mocks.queued_batch_tasks)) - self.assertEqual("schedule_campaign_event", mr_mocks.queued_batch_tasks[-1]["type"]) - - # should be redirected back to our campaign event read page - event = CampaignEvent.objects.filter(is_active=True).get() - self.assertRedirect(response, reverse("campaigns.campaignevent_read", args=[event.campaign.uuid, event.pk])) - - # should now have update the campaign event - self.assertEqual(self.reminder_flow, event.flow) - self.assertEqual(self.planting_date, event.relative_to) - self.assertEqual(1, event.offset) - self.assertEqual("I", event.start_mode) - - # flow event always set exec mode to 'F' no matter what - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=15, - base="", - direction="A", - offset=1, - unit="D", - event_type="F", - flow_to_start=self.reminder_flow.pk, - flow_start_mode="S", - ) - response = self.client.post(reverse("campaigns.campaignevent_update", args=[event.pk]), post_data) - - # should be redirected to our new event - previous_event = event - event = CampaignEvent.objects.filter(is_active=True).get() - - # reading our old event should redirect to the campaign page - response = self.client.get( - reverse("campaigns.campaignevent_read", args=[previous_event.campaign.uuid, previous_event.pk]) - ) - self.assertRedirect(response, reverse("campaigns.campaign_read", args=[previous_event.campaign.uuid])) - - # attempting to update our old event gives a 404 - response = self.client.post(reverse("campaigns.campaignevent_update", args=[previous_event.pk]), post_data) - self.assertEqual(404, response.status_code) - - # should now have update the campaign event - self.assertEqual(self.reminder_flow, event.flow) - self.assertEqual(self.planting_date, event.relative_to) - self.assertEqual(1, event.offset) - self.assertEqual("S", event.start_mode) - - # should have queued another scheduling task to mailroom - self.assertEqual(3, len(mr_mocks.queued_batch_tasks)) - self.assertEqual("schedule_campaign_event", mr_mocks.queued_batch_tasks[-1]["type"]) - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=15, - base="", - direction="A", - offset=2, - unit="D", - event_type="F", - flow_to_start=self.reminder2_flow.pk, - flow_start_mode="I", - ) - self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - # should have queued another scheduling task to mailroom - self.assertEqual(4, len(mr_mocks.queued_batch_tasks)) - - # trying to archive our flow should fail since it belongs to a campaign - post_data = dict(action="archive", objects=[self.reminder_flow.pk]) - response = self.client.post(reverse("flows.flow_list"), post_data) - self.reminder_flow.refresh_from_db() - self.assertFalse(self.reminder_flow.is_archived) - # TODO: Convert to temba-toast - # self.assertEqual( - # "The following flows are still used by campaigns so could not be archived: Reminder Flow", - # response.get("Temba-Toast"), - # ) - - post_data = dict(action="archive", objects=[self.reminder_flow.pk, self.reminder2_flow.pk]) - response = self.client.post(reverse("flows.flow_list"), post_data) - # self.assertEqual( - # "The following flows are still used by campaigns so could not be archived: Planting Reminder, Reminder Flow", - # response.get("Temba-Toast"), - # ) - - for e in CampaignEvent.objects.filter(flow=self.reminder2_flow.pk): - e.release(self.admin) - - # archive the campaign - post_data = dict(action="archive", objects=campaign.pk) - self.client.post(reverse("campaigns.campaign_list"), post_data) - response = self.client.get(reverse("campaigns.campaign_list")) - self.assertNotContains(response, "Planting Reminders") - - # should not have queued another scheduling task to mailroom since campaign is now archived - self.assertEqual(4, len(mr_mocks.queued_batch_tasks)) - - # shouldn't have any active event fires - self.assertFalse(EventFire.objects.filter(event__is_active=True).exists()) - - # restore the campaign - post_data = dict(action="restore", objects=campaign.pk) - self.client.post(reverse("campaigns.campaign_archived"), post_data) - - # should have queued another scheduling task to mailroom - self.assertEqual(5, len(mr_mocks.queued_batch_tasks)) - - # set a planting date on our other farmer - self.set_contact_field(self.farmer2, "planting_date", f"1/6/{current_year+1}") - - # should have an event fire now - fires = EventFire.objects.filter(event__is_active=True) - self.assertEqual(1, len(fires)) - - # setting a planting date on our outside contact has no effect - self.set_contact_field(self.nonfarmer, "planting_date", f"1/7/{current_year+3}") - self.assertEqual(1, EventFire.objects.filter(event__is_active=True).count()) - - self.set_contact_field(self.farmer1, "planting_date", f"4/8/{current_year-2}") - - event = CampaignEvent.objects.filter(is_active=True).first() - - # get the detail page of the event - response = self.client.get(reverse("campaigns.campaignevent_read", args=[event.campaign.uuid, event.id])) - self.assertEqual(200, response.status_code) - self.assertEqual(response.context["scheduled_event_fires_count"], 0) - self.assertEqual(len(response.context["scheduled_event_fires"]), 1) - - # delete the event - self.client.post(reverse("campaigns.campaignevent_delete", args=[event.id]), dict()) - self.assertFalse(CampaignEvent.objects.filter(is_active=True).exists()) - response = self.client.get(reverse("campaigns.campaign_read", args=[campaign.uuid])) - self.assertNotContains(response, "Color Flow") - - post_data = dict( - relative_to=self.planting_date.pk, - delivery_hour=-1, - base="", - direction="A", - offset=2, - unit="D", - event_type="F", - flow_start_mode="I", - flow_to_start=self.background_flow.pk, - ) - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - # events created with background flows are always passive start mode - event = CampaignEvent.objects.filter(is_active=True).get() - self.assertEqual(CampaignEvent.MODE_PASSIVE, event.start_mode) - - def test_view_campaign_cant_modify_inactive_or_archive(self): - self.login(self.admin) - - campaign = Campaign.create(self.org, self.admin, "Planting Reminders", self.farmers) - - response = self.client.get(reverse("campaigns.campaign_update", args=[campaign.id])) - - # sanity check, form is available in the response - self.assertContains(response, "Planting Reminders") - self.assertListEqual(list(response.context["form"].fields.keys()), ["name", "group", "loc"]) - - # archive the campaign - campaign.is_archived = True - campaign.save() - - response = self.client.get(reverse("campaigns.campaign_update", args=[campaign.id])) - - # we should get 404 for the archived campaign - self.assertEqual(response.status_code, 404) - - # deactivate the campaign - campaign.is_archived = False - campaign.is_active = False - campaign.save() - - response = self.client.get(reverse("campaigns.campaign_update", args=[campaign.pk])) - - # we should get 404 for the inactive campaign - self.assertEqual(response.status_code, 404) - - def test_view_campaign_archive(self): - self.login(self.admin) - - post_data = dict(name="Planting Reminders", group=self.farmers.pk) - self.client.post(reverse("campaigns.campaign_create"), post_data) - - campaign = Campaign.objects.filter(is_active=True).first() - - # archive the campaign - response = self.client.post(reverse("campaigns.campaign_archive", args=[campaign.pk])) - - self.assertRedirect(response, f"/campaign/read/{campaign.uuid}/") - - campaign.refresh_from_db() - self.assertTrue(campaign.is_archived) - - def test_view_campaign_activate(self): - self.login(self.admin) - - post_data = dict(name="Planting Reminders", group=self.farmers.pk) - self.client.post(reverse("campaigns.campaign_create"), post_data) - - campaign = Campaign.objects.filter(is_active=True).first() - - # activate the campaign - response = self.client.post(reverse("campaigns.campaign_activate", args=[campaign.pk])) - - self.assertRedirect(response, f"/campaign/read/{campaign.uuid}/") - - campaign.refresh_from_db() - self.assertFalse(campaign.is_archived) - - def test_view_campaignevent_update_on_archived_campaign(self): - self.login(self.admin) - - campaign = Campaign.create(self.org, self.admin, "Planting Reminders", self.farmers) - - # create a reminder for our first planting event - event = CampaignEvent.create_flow_event( - self.org, self.admin, campaign, relative_to=self.planting_date, offset=3, unit="D", flow=self.reminder_flow - ) - - response = self.client.get(reverse("campaigns.campaignevent_update", args=[event.pk])) - - # sanity check, form is available in the response - self.assertContains(response, "Planting Reminder") - - self.assertEqual( - set(response.context["form"].fields.keys()), - { - "offset", - "unit", - "relative_to", - "event_type", - "delivery_hour", - "direction", - "flow_to_start", - "flow_start_mode", - "message_start_mode", - "eng", - "kin", - "loc", - }, - ) - - # archive the campaign - campaign.is_archived = True - campaign.save() - - response = self.client.get(reverse("campaigns.campaignevent_update", args=[campaign.pk])) - - # we should get 404 for the archived campaign - self.assertEqual(response.status_code, 404) - - # deactivate the campaign - campaign.is_archived = False - campaign.is_active = False - campaign.save() - - response = self.client.get(reverse("campaigns.campaign_update", args=[campaign.pk])) - - # we should get 404 for the inactive campaign - self.assertEqual(response.status_code, 404) - - def test_view_campaignevent_create_on_archived_campaign(self): - self.login(self.admin) - - campaign = Campaign.create(self.org, self.admin, "Planting Reminders", self.farmers) - - post_data = dict( - relative_to=self.planting_date.pk, - event_type="M", - eng="This is my message", - kin="muraho", - direction="B", - offset=1, - unit="W", - flow_to_start="", - delivery_hour=13, - message_start_mode="I", - ) - - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - self.assertRedirect(response, reverse("campaigns.campaign_read", args=[campaign.uuid])) - - # archive the campaign - campaign.is_archived = True - campaign.save() - - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - # we should get 404 for the archived campaign - self.assertEqual(response.status_code, 404) - - # deactivate the campaign - campaign.is_archived = False - campaign.is_active = False - campaign.save() - - response = self.client.post(reverse("campaigns.campaignevent_create") + "?campaign=%d" % campaign.pk, post_data) - - # we should get 404 for the inactive campaign - self.assertEqual(response.status_code, 404) - def test_eventfire_get_relative_to_value(self): campaign = Campaign.create(self.org, self.admin, "Planting Reminders", self.farmers) created_on = self.org.fields.get(key="created_on") diff --git a/temba/campaigns/tests/test_campaigncrudl.py b/temba/campaigns/tests/test_campaigncrudl.py index 912aeb57da2..b2205720006 100644 --- a/temba/campaigns/tests/test_campaigncrudl.py +++ b/temba/campaigns/tests/test_campaigncrudl.py @@ -35,7 +35,7 @@ def test_create(self): create_url = reverse("campaigns.campaign_create") - self.assertRequestDisallowed(create_url, [None, self.user, self.agent]) + self.assertRequestDisallowed(create_url, [None, self.agent]) self.assertCreateFetch(create_url, [self.editor, self.admin], form_fields=["name", "group"]) # try to submit with no data @@ -60,7 +60,7 @@ def test_read(self): read_url = reverse("campaigns.campaign_read", args=[campaign.uuid]) self.assertRequestDisallowed(read_url, [None, self.agent, self.admin2]) - response = self.assertReadFetch(read_url, [self.user, self.editor, self.admin], context_object=campaign) + response = self.assertReadFetch(read_url, [self.editor, self.admin], context_object=campaign) self.assertContains(response, "Welcomes") self.assertContains(response, "Registered") @@ -70,41 +70,6 @@ def test_read(self): self.assertContentMenu(read_url, self.admin, ["Activate", "Export"]) - def test_archive_and_activate(self): - group = self.create_group("Reporters", contacts=[]) - campaign = self.create_campaign(self.org, "Welcomes", group) - other_org_group = self.create_group("Reporters", contacts=[], org=self.org2) - other_org_campaign = self.create_campaign(self.org2, "Welcomes", other_org_group) - - archive_url = reverse("campaigns.campaign_archive", args=[campaign.id]) - - # can't archive campaign if not logged in - response = self.client.post(archive_url) - self.assertLoginRedirect(response) - - self.login(self.admin) - - response = self.client.post(archive_url) - self.assertEqual(302, response.status_code) - - campaign.refresh_from_db() - self.assertTrue(campaign.is_archived) - - # activate that archve - response = self.client.post(reverse("campaigns.campaign_activate", args=[campaign.id])) - self.assertEqual(302, response.status_code) - - campaign.refresh_from_db() - self.assertFalse(campaign.is_archived) - - # can't archive campaign from other org - response = self.client.post(reverse("campaigns.campaign_archive", args=[other_org_campaign.id])) - self.assertEqual(302, response.status_code) - - # check object is unchanged - other_org_campaign.refresh_from_db() - self.assertFalse(other_org_campaign.is_archived) - @mock_mailroom def test_update(self, mr_mocks): group1 = self.create_group("Reporters", contacts=[]) @@ -114,7 +79,7 @@ def test_update(self, mr_mocks): update_url = reverse("campaigns.campaign_update", args=[campaign.id]) - self.assertRequestDisallowed(update_url, [None, self.user, self.agent, self.admin2]) + self.assertRequestDisallowed(update_url, [None, self.agent, self.admin2]) self.assertUpdateFetch( update_url, [self.editor, self.admin], form_fields={"name": "Welcomes", "group": group1.id} ) @@ -157,17 +122,76 @@ def test_update(self, mr_mocks): mr_mocks.queued_batch_tasks[1], ) + # can't update archived campaign + campaign.archive(self.admin) + + self.assertRequestDisallowed(update_url, [self.admin]) + def test_list(self): + list_url = reverse("campaigns.campaign_list") + group = self.create_group("Reporters", contacts=[]) campaign1 = self.create_campaign(self.org, "Welcomes", group) campaign2 = self.create_campaign(self.org, "Follow Ups", group) + campaign3 = self.create_campaign(self.org, "Reminders", group) + campaign3.archive(self.admin) other_org_group = self.create_group("Reporters", contacts=[], org=self.org2) self.create_campaign(self.org2, "Welcomes", other_org_group) - list_url = reverse("campaigns.campaign_list") - self.assertRequestDisallowed(list_url, [None, self.agent]) - self.assertListFetch(list_url, [self.user, self.editor, self.admin], context_objects=[campaign2, campaign1]) - self.assertContentMenu(list_url, self.user, []) + self.assertListFetch(list_url, [self.editor, self.admin], context_objects=[campaign2, campaign1]) self.assertContentMenu(list_url, self.admin, ["New Campaign"]) + + def test_archived(self): + archived_url = reverse("campaigns.campaign_archived") + + group = self.create_group("Reporters", contacts=[]) + campaign1 = self.create_campaign(self.org, "Welcomes", group) + campaign2 = self.create_campaign(self.org, "Follow Ups", group) + self.create_campaign(self.org, "Reminders", group) + + other_org_group = self.create_group("Reporters", contacts=[], org=self.org2) + self.create_campaign(self.org2, "Welcomes", other_org_group) + + campaign1.archive(self.admin) + campaign2.archive(self.admin) + + self.assertRequestDisallowed(archived_url, [None, self.agent]) + self.assertListFetch(archived_url, [self.editor, self.admin], context_objects=[campaign2, campaign1]) + self.assertContentMenu(archived_url, self.admin, []) + + def test_archive_and_activate(self): + group = self.create_group("Reporters", contacts=[]) + campaign = self.create_campaign(self.org, "Welcomes", group) + other_org_group = self.create_group("Reporters", contacts=[], org=self.org2) + other_org_campaign = self.create_campaign(self.org2, "Welcomes", other_org_group) + + archive_url = reverse("campaigns.campaign_archive", args=[campaign.id]) + + # can't archive campaign if not logged in + response = self.client.post(archive_url) + self.assertLoginRedirect(response) + + self.login(self.admin) + + response = self.client.post(archive_url) + self.assertEqual(302, response.status_code) + + campaign.refresh_from_db() + self.assertTrue(campaign.is_archived) + + # activate that archve + response = self.client.post(reverse("campaigns.campaign_activate", args=[campaign.id])) + self.assertEqual(302, response.status_code) + + campaign.refresh_from_db() + self.assertFalse(campaign.is_archived) + + # can't archive campaign from other org + response = self.client.post(reverse("campaigns.campaign_archive", args=[other_org_campaign.id])) + self.assertEqual(302, response.status_code) + + # check object is unchanged + other_org_campaign.refresh_from_db() + self.assertFalse(other_org_campaign.is_archived) diff --git a/temba/campaigns/tests/test_eventcrudl.py b/temba/campaigns/tests/test_eventcrudl.py index cb7bedcae42..f28d6f6b0f0 100644 --- a/temba/campaigns/tests/test_eventcrudl.py +++ b/temba/campaigns/tests/test_eventcrudl.py @@ -40,7 +40,7 @@ def test_read(self): read_url = reverse("campaigns.campaignevent_read", args=[event.campaign.uuid, event.id]) self.assertRequestDisallowed(read_url, [None, self.agent, self.admin2]) - response = self.assertReadFetch(read_url, [self.user, self.editor, self.admin], context_object=event) + response = self.assertReadFetch(read_url, [self.editor, self.admin], context_object=event) self.assertContains(response, "Welcomes") self.assertContains(response, "1 week after") @@ -89,7 +89,7 @@ def test_create(self): "message_start_mode", ] - self.assertRequestDisallowed(create_url, [None, self.user, self.agent]) + self.assertRequestDisallowed(create_url, [None, self.agent]) response = self.assertCreateFetch(create_url, [self.editor, self.admin], form_fields=non_lang_fields + ["eng"]) self.assertEqual(3, len(response.context["form"].fields["message_start_mode"].choices)) diff --git a/temba/flows/tests/test_flowcrudl.py b/temba/flows/tests/test_flowcrudl.py index 68135e3e293..1dba8e2e18e 100644 --- a/temba/flows/tests/test_flowcrudl.py +++ b/temba/flows/tests/test_flowcrudl.py @@ -9,6 +9,7 @@ from django.urls import reverse from temba import mailroom +from temba.campaigns.models import Campaign, CampaignEvent from temba.contacts.models import URN from temba.flows.models import Flow, FlowLabel, FlowStart, FlowUserConflictException, ResultsExport from temba.msgs.models import SystemLabel @@ -566,20 +567,33 @@ def test_list_views(self): flow2.is_archived = True flow2.save(update_fields=("is_archived",)) + # create flow used by a campaign + group = self.create_group("Reporters", contacts=[]) flow3 = self.create_flow("Flow 3") + campaign = Campaign.create(self.org, self.admin, "Reminders", group) + registered = self.create_field("registered", "Registered", value_type="D") + CampaignEvent.create_flow_event( + self.org, self.admin, campaign, registered, offset=1, unit="W", flow=flow3, delivery_hour="13" + ) - self.login(self.admin) + list_url = reverse("flows.flow_list") - # see our trigger on the list page - response = self.client.get(reverse("flows.flow_list")) - self.assertContains(response, flow1.name) - self.assertContains(response, flow3.name) + self.assertRequestDisallowed(list_url, [None, self.agent]) + self.assertListFetch(list_url, [self.editor, self.admin], context_objects=[flow3, flow1]) + + # try to archive flow used by campaign + response = self.client.post(list_url, {"action": "archive", "objects": flow3.id}) + # TODO: convert to temba-toast + # self.assertContains(response, "The following flows are still used by campaigns") + + flow3.refresh_from_db() + self.assertFalse(flow3.is_archived) - # archive it - response = self.client.post(reverse("flows.flow_list"), {"action": "archive", "objects": flow1.id}) + # archive first flow + response = self.client.post(list_url, {"action": "archive", "objects": flow1.id}) self.assertEqual(200, response.status_code) - # flow should no longer appear in list + # should no longer appear in list response = self.client.get(reverse("flows.flow_list")) self.assertNotContains(response, flow1.name) self.assertContains(response, flow3.name) diff --git a/temba/settings_common.py b/temba/settings_common.py index 57c64e08169..b8e8edf1228 100644 --- a/temba/settings_common.py +++ b/temba/settings_common.py @@ -578,11 +578,6 @@ "triggers.trigger.*", ), "Viewers": ( - "campaigns.campaign_list", - "campaigns.campaign_menu", - "campaigns.campaign_read", - "campaigns.campaignevent_list", - "campaigns.campaignevent_read", "channels.channel_list", "channels.channel_read", "channels.channelevent_list", From f7978ca0b99b26c98c10a9ec3ac7207379c947b6 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 14 Jan 2025 15:12:59 +0000 Subject: [PATCH 2/3] Simplifiy creating campaign events for a given campaign --- temba/campaigns/tests/test_eventcrudl.py | 27 +++++++++++++++++++++++- temba/campaigns/views.py | 23 ++++++++++---------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/temba/campaigns/tests/test_eventcrudl.py b/temba/campaigns/tests/test_eventcrudl.py index f28d6f6b0f0..6c017380f88 100644 --- a/temba/campaigns/tests/test_eventcrudl.py +++ b/temba/campaigns/tests/test_eventcrudl.py @@ -57,6 +57,13 @@ def test_read(self): self.assertContentMenu(read_url, self.admin, ["Delete"]) + # deleted events should redirect to campaign read page + event.is_active = False + event.save(update_fields=("is_active",)) + + response = self.requestView(read_url, self.editor) + self.assertRedirect(response, reverse("campaigns.campaign_read", args=[event.campaign.uuid])) + def test_create(self): farmer1 = self.create_contact("Rob Jasper", phone="+250788111111") farmer2 = self.create_contact("Mike Gordon", phone="+250788222222", language="kin") @@ -72,7 +79,7 @@ def test_create(self): # create a campaign for our farmers group campaign = Campaign.create(self.org, self.admin, "Planting Reminders", farmers) - create_url = f"{reverse('campaigns.campaignevent_create')}?campaign={campaign.id}" + create_url = reverse("campaigns.campaignevent_create", args=[campaign.id]) # update org to use a single flow language self.org.set_flow_languages(self.admin, ["eng"]) @@ -121,6 +128,24 @@ def test_create(self): form_errors={"flow_start_mode": "This field is required.", "flow_to_start": "This field is required."}, ) + # try to create a message event that's too long + self.assertCreateSubmit( + create_url, + self.admin, + { + "relative_to": planting_date.id, + "event_type": "M", + "eng": "x" * 641, + "direction": "A", + "offset": 1, + "unit": "W", + "flow_to_start": "", + "delivery_hour": 13, + "message_start_mode": "I", + }, + form_errors={"__all__": "Translation for 'English' exceeds the 640 character limit."}, + ) + # can create an event with just a eng translation self.assertCreateSubmit( create_url, diff --git a/temba/campaigns/views.py b/temba/campaigns/views.py index 45630ccf58e..7fd0f1bcdf2 100644 --- a/temba/campaigns/views.py +++ b/temba/campaigns/views.py @@ -5,7 +5,9 @@ from django.core.exceptions import ValidationError from django.db.models.functions import Lower from django.http import Http404, HttpResponseRedirect +from django.shortcuts import get_object_or_404 from django.urls import reverse +from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from temba.contacts.models import ContactField, ContactGroup @@ -129,7 +131,7 @@ def build_context_menu(self, menu): menu.add_modax( _("New Event"), "event-add", - f"{reverse('campaigns.campaignevent_create')}?campaign={obj.id}", + reverse("campaigns.campaignevent_create", args=[obj.id]), as_button=True, ) @@ -661,13 +663,15 @@ def get_context_data(self, **kwargs): context["background_warning"] = CampaignEventCRUDL.BACKGROUND_WARNING return context - def pre_process(self, request, *args, **kwargs): - campaign_id = request.GET.get("campaign", None) - if campaign_id: - campaign = Campaign.objects.filter(id=campaign_id, is_active=True, is_archived=False) + @classmethod + def derive_url_pattern(cls, path, action): + return r"^%s/%s/(?P\d+)/$" % (path, action) - if not campaign.exists(): - raise Http404("Campaign not found") + @cached_property + def campaign(self): + return get_object_or_404( + Campaign, id=self.kwargs["campaign_id"], org=self.request.org, is_active=True, is_archived=False + ) def derive_fields(self): from copy import deepcopy @@ -711,9 +715,6 @@ def post_save(self, obj): def pre_save(self, obj): obj = super().pre_save(obj) - obj.campaign = Campaign.objects.get(org=self.request.org, id=self.request.GET.get("campaign")) + obj.campaign = self.campaign self.form.pre_save(self.request, obj) return obj - - def form_invalid(self, form): - return super().form_invalid(form) From 3c75cf71626dfcf92ea49c83cfbdbc5925aaa321 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 14 Jan 2025 21:39:18 +0000 Subject: [PATCH 3/3] Coverage --- temba/campaigns/tests/test_campaigncrudl.py | 10 ++++++ temba/campaigns/tests/test_eventcrudl.py | 36 +++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/temba/campaigns/tests/test_campaigncrudl.py b/temba/campaigns/tests/test_campaigncrudl.py index b2205720006..f44e5ec2438 100644 --- a/temba/campaigns/tests/test_campaigncrudl.py +++ b/temba/campaigns/tests/test_campaigncrudl.py @@ -57,12 +57,22 @@ def test_create(self): def test_read(self): group = self.create_group("Reporters", contacts=[]) campaign = self.create_campaign(self.org, "Welcomes", group) + registered = self.org.fields.get(key="registered") + CampaignEvent.create_flow_event( + self.org, self.admin, campaign, registered, offset=0, unit="D", flow=self.create_flow("Event2Flow") + ) + CampaignEvent.create_flow_event( + self.org, self.admin, campaign, registered, offset=1, unit="H", flow=self.create_flow("Event3Flow") + ) + read_url = reverse("campaigns.campaign_read", args=[campaign.uuid]) self.assertRequestDisallowed(read_url, [None, self.agent, self.admin2]) response = self.assertReadFetch(read_url, [self.editor, self.admin], context_object=campaign) self.assertContains(response, "Welcomes") self.assertContains(response, "Registered") + self.assertContains(response, "Event2Flow") + self.assertContains(response, "Event3Flow") self.assertContentMenu(read_url, self.admin, ["New Event", "Edit", "Export", "Archive"]) diff --git a/temba/campaigns/tests/test_eventcrudl.py b/temba/campaigns/tests/test_eventcrudl.py index 6c017380f88..8167d9eeaa0 100644 --- a/temba/campaigns/tests/test_eventcrudl.py +++ b/temba/campaigns/tests/test_eventcrudl.py @@ -216,6 +216,42 @@ def test_create(self): # should be redirected back to our campaign read page self.assertRedirect(response, reverse("campaigns.campaign_read", args=[campaign.uuid])) + # also create a flow event for a regular flow + flow1 = self.create_flow("Event Flow 1") + response = self.assertCreateSubmit( + create_url, + self.admin, + { + "relative_to": planting_date.id, + "event_type": "F", + "direction": "B", + "offset": 2, + "unit": "D", + "flow_to_start": flow1.id, + "delivery_hour": 13, + "flow_start_mode": "I", + }, + new_obj_query=CampaignEvent.objects.filter(campaign=campaign, event_type="F", flow=flow1, start_mode="I"), + ) + + # and a flow event for a background flow + flow2 = self.create_flow("Event Flow 2", flow_type=Flow.TYPE_BACKGROUND) + response = self.assertCreateSubmit( + create_url, + self.admin, + { + "relative_to": planting_date.id, + "event_type": "F", + "direction": "B", + "offset": 2, + "unit": "D", + "flow_to_start": flow2.id, + "delivery_hour": 13, + "flow_start_mode": "I", + }, + new_obj_query=CampaignEvent.objects.filter(campaign=campaign, event_type="F", flow=flow2, start_mode="P"), + ) + event = CampaignEvent.objects.get(campaign=campaign, event_type="M", offset=-2) self.assertEqual(-2, event.offset) self.assertEqual(13, event.delivery_hour)