Skip to content

Commit

Permalink
8721 refactor nr day job to fix event loop already running issue (#1186)
Browse files Browse the repository at this point in the history
  • Loading branch information
vysakh-menon-aot authored Sep 15, 2021
1 parent a91f37c commit b9d9277
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
2 changes: 1 addition & 1 deletion api/namex/models/name.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def update_nr_name_search(mapper, connection, target):
current_app.logger.debug('name_consume_history.added {}'.format(nr.nrNum))
if len(name_consume_history.added):
# Adding an after_flush_postexec to avoid connection and transaction closed issue's
# It registrars and executes only once, so its only for the current session
# Creating one time execution event when ever corpNum is added to a name
# corpNum sets from nro-extractor job
@event.listens_for(db.session, 'after_flush_postexec', once=True)
def receive_after_flush_postexec(session, flush_context):
Expand Down
37 changes: 17 additions & 20 deletions jobs/nr-day-job/nr_day_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""s2i based launch script to run the service."""
import asyncio
import os
import time
import uuid
from datetime import datetime, timezone

from flask import Flask, current_app
from namex.models import Request, State, db
from namex.services.queue import QueueService
from namex.services import queue
from queue_common.messages import create_cloud_event_msg
from sqlalchemy import text

Expand All @@ -34,6 +33,8 @@ def create_app():
"""Return a configured Flask App using the Factory method."""
app = Flask(__name__)
app.config.from_object(APP_CONFIG)

queue.init_app(app)
db.init_app(app)

register_shellcontext(app)
Expand All @@ -50,15 +51,14 @@ def shell_context():
app.shell_context_processor(shell_context)


async def publish_email_message(qsm: QueueService, payload: dict): # pylint: disable=redefined-outer-name
def publish_email_message(payload: dict):
"""Publish the email message onto the NATS emailer subject."""
subject = APP_CONFIG.NATS_EMAILER_SUBJECT
current_app.logger.debug('publish to queue, subject:%s, event:%s', subject, payload)
await qsm.publish_json_to_subject(payload, subject)
queue.publish_json_to_subject_sync(payload, subject)


async def furnish_request_message(
qsm: QueueService,
def furnish_request_message(
request: Request,
option: str
): # pylint: disable=redefined-outer-name
Expand All @@ -78,7 +78,7 @@ async def furnish_request_message(
}
)
current_app.logger.debug('About to publish email for %s nrNum=%s', option, request.nrNum)
await publish_email_message(qsm, payload)
publish_email_message(payload)

if option == 'before-expiry':
request.notifiedBeforeExpiry = True
Expand All @@ -88,10 +88,10 @@ async def furnish_request_message(
request.save_to_db()


async def notify_nr_before_expiry(app: Flask, qsm: QueueService): # pylint: disable=redefined-outer-name
def notify_nr_before_expiry():
"""Send nr before expiry."""
try:
app.logger.debug('entering notify_nr_before_expiry')
current_app.logger.debug('entering notify_nr_before_expiry')

where_clause = text(
"expiration_date - interval '14 day' <= CURRENT_DATE AND expiration_date > CURRENT_DATE")
Expand All @@ -101,15 +101,15 @@ async def notify_nr_before_expiry(app: Flask, qsm: QueueService): # pylint: dis
where_clause
).all()
for request in requests:
await furnish_request_message(qsm, request, 'before-expiry')
furnish_request_message(request, 'before-expiry')
except Exception as err: # noqa B902; pylint: disable=W0703;
app.logger.error(err)
current_app.logger.error(err)


async def notify_nr_expired(app: Flask, qsm: QueueService): # pylint: disable=redefined-outer-name
def notify_nr_expired():
"""Send nr expired."""
try:
app.logger.debug('entering notify_nr_expired')
current_app.logger.debug('entering notify_nr_expired')

where_clause = text('expiration_date <= CURRENT_DATE')
requests = db.session.query(Request).filter(
Expand All @@ -119,9 +119,9 @@ async def notify_nr_expired(app: Flask, qsm: QueueService): # pylint: disable=r
where_clause
).all()
for request in requests:
await furnish_request_message(qsm, request, 'expired')
furnish_request_message(request, 'expired')
except Exception as err: # noqa B902; pylint: disable=W0703;
app.logger.error(err)
current_app.logger.error(err)


if __name__ == '__main__':
Expand All @@ -130,8 +130,5 @@ async def notify_nr_expired(app: Flask, qsm: QueueService): # pylint: disable=r

application = create_app()
with application.app_context():
event_loop = asyncio.get_event_loop()
qsm = QueueService(app=application, loop=event_loop)

event_loop.run_until_complete(notify_nr_before_expiry(application, qsm))
event_loop.run_until_complete(notify_nr_expired(application, qsm))
notify_nr_before_expiry()
notify_nr_expired()

0 comments on commit b9d9277

Please sign in to comment.