Skip to content

Commit

Permalink
Send high priority, transactional messages before bulk messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonkagwi committed Nov 13, 2024
1 parent 5cb78a2 commit ad00b81
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 209 deletions.
32 changes: 9 additions & 23 deletions src/smpp_gateway/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
backend: Backend,
hc_worker: HealthchecksIoWorker,
submit_sm_params: dict,
listen_transactional_mt_messages_only: bool,
set_priority_flag: bool,
*args,
**kwargs,
):
Expand All @@ -78,9 +78,7 @@ def __init__(
self.backend = backend
self.hc_worker = hc_worker
self.submit_sm_params = submit_sm_params
self.listen_transactional_mt_messages_only = (
listen_transactional_mt_messages_only
)
self.set_priority_flag = set_priority_flag
super().__init__(*args, **kwargs)
self._pg_conn = pg_listen(self.backend.name)

Expand Down Expand Up @@ -173,24 +171,15 @@ def receive_pg_notify(self):
if self._pg_conn.notifies:
notify = self._pg_conn.notifies.pop()
logger.info(f"Got NOTIFY:{notify}")
if self.listen_transactional_mt_messages_only:
if notify.payload.isdigit():
self.send_mt_messages(
extra_filter={
"id": int(notify.payload),
"is_transactional": True,
}
)
else:
self.send_mt_messages()
self.send_mt_messages()

def send_mt_messages(self, extra_filter=None):
smses = get_mt_messages_to_send(
limit=100, backend=self.backend, extra_filter=extra_filter
)
def send_mt_messages(self):
smses = get_mt_messages_to_send(limit=100, backend=self.backend)
submit_sm_resps = []
for sms in smses:
params = {**self.submit_sm_params, **sms["params"]}
if self.set_priority_flag and sms["priority_flag"] is not None:
params["priority_flag"] = sms["priority_flag"]
pdus = self.split_and_send_message(sms["short_message"], **params)
# Create placeholder MTMessageStatus objects in the DB, which
# the message_sent handler will later update with the actual command_status
Expand Down Expand Up @@ -238,10 +227,7 @@ def split_and_send_message(self, message, **kwargs):
def listen(self, ignore_error_codes=None, auto_send_enquire_link=True):
self.logger.info("Entering main listen loop")
# Look for and send up to 100 messages on start up
extra_messages_filter = None
if self.listen_transactional_mt_messages_only:
extra_messages_filter = {"is_transactional": True}
self.send_mt_messages(extra_filter=extra_messages_filter)
self.send_mt_messages()
while True:
# When either main socket has data or _pg_conn has data, select.select will return
rlist, _, _ = select.select(
Expand All @@ -252,7 +238,7 @@ def listen(self, ignore_error_codes=None, auto_send_enquire_link=True):
pdu = smpplib.smpp.make_pdu("enquire_link", client=self)
self.send_pdu(pdu)
# Look for and send up to 100 messages every 5 seconds
self.send_mt_messages(extra_filter=extra_messages_filter)
self.send_mt_messages()
continue
elif not rlist:
# backwards-compatible with existing behavior
Expand Down
8 changes: 5 additions & 3 deletions src/smpp_gateway/management/commands/smpp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ def add_arguments(self, parser):
"If set, --hc-ping-key must also be set.",
)
parser.add_argument(
"--listen-transactional-mt-messages-only",
"--set-priority-flag",
action=argparse.BooleanOptionalAction,
default=False,
help="Only listen for transactional MT messages and don't do bulk "
"sending.",
help="Whether to set the `priority_flag` param in the PDU, if one "
"is provided for a message. If a priority_flag is included in "
"--submit-sm-params, the priority_flag set on the individual "
"message will take precedence.",
)

def handle(self, *args, **options):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.2.16 on 2024-11-12 22:35

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("smpp_gateway", "0008_mtmessage_is_transactional"),
]

operations = [
migrations.RemoveField(
model_name="mtmessage",
name="is_transactional",
),
migrations.AddField(
model_name="mtmessage",
name="priority_flag",
field=models.IntegerField(
choices=[
(0, "Level 0 (lowest) priority"),
(1, "Level 1 priority"),
(2, "Level 2 priority"),
(3, "Level 3 (highest) priority"),
],
null=True,
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Generated by Django 4.2.16 on 2024-11-13 17:03

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("smpp_gateway", "0009_remove_mtmessage_is_transactional_and_more"),
]

operations = [
migrations.RemoveIndex(
model_name="mtmessage",
name="mt_message_status_idx",
),
migrations.AlterField(
model_name="mtmessage",
name="priority_flag",
field=models.IntegerField(
choices=[
(0, "Level 0 (lowest) priority"),
(1, "Level 1 priority"),
(2, "Level 2 priority"),
(3, "Level 3 (highest) priority"),
],
null=True,
verbose_name="priority flag",
),
),
migrations.AddIndex(
model_name="mtmessage",
index=models.Index(
models.F("status"),
models.OrderBy(
models.F("priority_flag"), descending=True, nulls_last=True
),
condition=models.Q(("status", "new")),
name="mt_message_status_idx",
),
),
]
20 changes: 14 additions & 6 deletions src/smpp_gateway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,30 @@ class Status(models.TextChoices):
DELIVERED = "delivered", _("Delivered")
ERROR = "error", _("Error")

class PriorityFlag(models.IntegerChoices):
# Based on the priority_flag values in the SMPP Spec
# https://smpp.org/SMPP_v3_4_Issue1_2.pdf
LEVEL_0 = 0, _("Level 0 (lowest) priority")
LEVEL_1 = 1, _("Level 1 priority")
LEVEL_2 = 2, _("Level 2 priority")
LEVEL_3 = 3, _("Level 3 (highest) priority")

backend = models.ForeignKey(
Backend, on_delete=models.PROTECT, verbose_name=_("backend")
)
# SMPP client will decide how to encode it
short_message = models.TextField(_("short message"))
params = models.JSONField(_("params"))
status = models.CharField(_("status"), max_length=32, choices=Status.choices)
is_transactional = models.BooleanField(null=True)
priority_flag = models.IntegerField(
_("priority flag"), choices=PriorityFlag.choices, null=True
)

def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.status == MTMessage.Status.NEW:
notify_sql = f"NOTIFY {self.backend.name}"
if self.is_transactional:
notify_sql += f", '{self.pk}'"
with connection.cursor() as curs:
curs.execute(notify_sql)
curs.execute(f"NOTIFY {self.backend.name}")

def __str__(self):
return f"{self.short_message} ({self.id})"
Expand All @@ -110,7 +117,8 @@ class Meta:
indexes = (
models.Index(
# Allow for quick filtering of messages that need to be processed
fields=["status"],
"status",
models.F("priority_flag").desc(nulls_last=True),
name="mt_message_status_idx",
condition=models.Q(status="new"), # No way to access Status.NEW here?
),
Expand Down
17 changes: 3 additions & 14 deletions src/smpp_gateway/outgoing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,16 @@ def prepare_request(self, id_, text, identities, context):
"short_message": text,
"params": params,
"status": MTMessage.Status.NEW,
"is_transactional": context.get("is_transactional"),
"priority_flag": context.get("priority_flag"),
}

def send(self, id_, text, identities, context=None):
logger.debug("Sending message: %s", text)
import pprint

print("context:", pprint.pformat(context))
context = context or {}
kwargs_generator = self.prepare_request(id_, text, identities, context)
for kwargs_group in grouper(kwargs_generator, self.send_group_size):
messages = MTMessage.objects.bulk_create(
MTMessage.objects.bulk_create(
[MTMessage(**kwargs) for kwargs in kwargs_group]
)
print("notifying", self.model.name)
notify_sql = f"NOTIFY {self.model.name}"
with connection.cursor() as curs:
if context.get("is_transactional"):
# Notify for each message, including the message ID as the payload.
# For transactional messages there should be only one message anyway
for message in messages:
curs.execute(notify_sql + f", '{message.pk}'")
else:
curs.execute(notify_sql)
curs.execute(f"NOTIFY {self.model.name}")
24 changes: 9 additions & 15 deletions src/smpp_gateway/queries.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging

from typing import Any, Union
from typing import Any

import psycopg2.extensions

from django.db import connection, transaction
from django.db.models import QuerySet
from django.db.models import F, QuerySet
from rapidsms.models import Backend

from smpp_gateway.models import MOMessage, MTMessage
Expand Down Expand Up @@ -38,23 +38,17 @@ def pg_notify(channel: str):
cursor.execute(f"NOTIFY {channel};")


def get_mt_messages_to_send(
limit: int, backend: Backend, extra_filter: Union[dict, None] = None
) -> list[dict[str, Any]]:
def get_mt_messages_to_send(limit: int, backend: Backend) -> list[dict[str, Any]]:
"""Fetches up to `limit` messages intended for `backend`, updates their
status to SENDING, and returns select fields from the model. If `extra_filter`
is provided, it is used to further filter the queryset.
status to SENDING, and returns select fields from the model. The messages
are sorted by descending `priority_flag`.
"""
with transaction.atomic():
queryset = MTMessage.objects.filter(
status=MTMessage.Status.NEW, backend=backend
)
if extra_filter:
queryset = queryset.filter(**extra_filter)
smses = list(
queryset.select_for_update(skip_locked=True).values(
"id", "short_message", "params"
)[:limit]
MTMessage.objects.filter(status=MTMessage.Status.NEW, backend=backend)
.select_for_update(skip_locked=True)
.order_by(F("priority_flag").desc(nulls_last=True))
.values("id", "short_message", "params", "priority_flag")[:limit]
)
if smses:
pks = [sms["id"] for sms in smses]
Expand Down
35 changes: 35 additions & 0 deletions src/smpp_gateway/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from rapidsms.messages.incoming import IncomingMessage
from rapidsms.messages.outgoing import OutgoingMessage
from rapidsms.router.blocking import BlockingRouter

from smpp_gateway.models import MTMessage


class PriorityIncomingMessage(IncomingMessage):
def respond(self, text, **kwargs):
fields = kwargs.get("fields", {})
if "priority_flag" not in fields:
fields["priority_flag"] = MTMessage.PriorityFlag.LEVEL_2.value
kwargs["fields"] = fields
return super().respond(text, **kwargs)


class PriorityOutgoingMessage(OutgoingMessage):
def extra_backend_context(self):
context = super().extra_backend_context()
context["priority_flag"] = self.fields.get(
"priority_flag", MTMessage.PriorityFlag.LEVEL_2.value
)
return context


class PriorityBlockingRouter(BlockingRouter):
def new_incoming_message(self, text, connections, class_=IncomingMessage, **kwargs):
return super().new_incoming_message(
text, connections, class_=PriorityIncomingMessage, **kwargs
)

def new_outgoing_message(self, text, connections, class_=OutgoingMessage, **kwargs):
return super().new_incoming_message(
text, connections, class_=PriorityOutgoingMessage, **kwargs
)
6 changes: 3 additions & 3 deletions src/smpp_gateway/smpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_smpplib_client(
notify_mo_channel: str,
backend: Backend,
submit_sm_params: dict,
listen_transactional_mt_messages_only: bool,
set_priority_flag: bool,
hc_check_uuid: str,
hc_ping_key: str,
hc_check_slug: str,
Expand All @@ -35,7 +35,7 @@ def get_smpplib_client(
backend,
hc_worker,
submit_sm_params,
listen_transactional_mt_messages_only,
set_priority_flag,
host,
port,
allow_unknown_opt_params=True,
Expand Down Expand Up @@ -69,7 +69,7 @@ def start_smpp_client(options):
options["notify_mo_channel"],
backend,
json.loads(options["submit_sm_params"]),
options["listen_transactional_mt_messages_only"],
options["set_priority_flag"],
options["hc_check_uuid"],
options["hc_ping_key"],
options["hc_check_slug"],
Expand Down
Loading

0 comments on commit ad00b81

Please sign in to comment.