Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PPR party client code formatting and matching. #1706

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 99 additions & 19 deletions ppr-api/src/ppr_api/models/client_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from flask import current_app
from sqlalchemy.sql import text

from ppr_api.exceptions import DatabaseException

Expand All @@ -27,6 +28,17 @@
from .db import db


CLIENT_CODE_BRANCH_QUERY = """
select LPAD(cc.id::text, 8, '0') as party_code, cc.id as branch_id, cc.head_id, cc.name, cc.contact_name,
cc.contact_phone_number, cc.contact_area_cd, cc.email_address,
a.street, a.street_additional, a.city, a.region, a.postal_code, a.country
from client_codes cc, addresses a
where LPAD(cc.id::text, 8, '0') like :query_val
and a.id = cc.address_id
order by cc.id
"""


class ClientCode(db.Model): # pylint: disable=too-many-instance-attributes
"""This class maintains client party information (registering and secured parties)."""

Expand Down Expand Up @@ -58,11 +70,11 @@ class ClientCode(db.Model): # pylint: disable=too-many-instance-attributes
def json(self) -> dict:
"""Return the client party branch as a json object."""
party = {
'code': str(self.id),
'code': self.format_party_code(),
'businessName': self.name,
'contact': {
'name': self.contact_name,
'phoneNumber': self.contact_phone_number
'name': self.contact_name if self.contact_name else '',
'phoneNumber': self.contact_phone_number if self.contact_phone_number else ''
}
}
if self.contact_area_cd:
Expand All @@ -75,6 +87,10 @@ def json(self) -> dict:

return party

def format_party_code(self) -> str:
"""Return the client party code in the 8 character format padded with leading zeroes."""
return str(self.id).strip().rjust(8, '0')

@classmethod
def find_by_code(cls, code: str = None):
"""Return a client party branch json object by client code."""
Expand All @@ -83,7 +99,7 @@ def find_by_code(cls, code: str = None):
try:
party = db.session.query(ClientCode).filter(ClientCode.id == int(code)).one_or_none()
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_code exception: ' + repr(db_exception))
current_app.logger.error('DB find_by_code exception: ' + str(db_exception))
raise DatabaseException(db_exception)

if party:
Expand All @@ -99,9 +115,9 @@ def find_by_head_office_code(cls, head_office_id: str):
party_list = None
try:
party_list = db.session.query(ClientCode).\
filter(ClientCode.head_id == int(head_office_id.strip())).all()
filter(ClientCode.head_id == int(head_office_id.strip())).order_by(ClientCode.id).all()
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_head_office_code exception: ' + repr(db_exception))
current_app.logger.error('DB find_by_head_office_code exception: ' + str(db_exception))
raise DatabaseException(db_exception)

if not party_list:
Expand All @@ -111,6 +127,31 @@ def find_by_head_office_code(cls, head_office_id: str):

return party_codes

@classmethod
def find_by_head_office_start(cls, head_office_id: str):
"""Return a list of client parties belonging to a head office searching by code or partial code."""
if len(head_office_id.strip()) == 4:
return cls.find_by_head_office_code(head_office_id)
party_codes = []
# Example 111 match on 111 or 1110..1119; 001 match on 1 or 10..19.
base_id: int = int(head_office_id.strip())
start_id = base_id * 10
end_id = start_id + 9
try:
party_list = db.session.query(ClientCode).filter((ClientCode.head_id == base_id) |
(ClientCode.head_id.between(start_id, end_id)))\
.order_by(ClientCode.id).all()
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_head_office_start exception: ' + str(db_exception))
raise DatabaseException(db_exception)

if not party_list:
return party_codes
for party in party_list:
party_codes.append(party.json)

return party_codes

@classmethod
def find_by_account_id(cls, account_id: str, crown_charge: bool = True):
"""Return a list of client parties searching by account ID using the account id - bcol id mapping table."""
Expand All @@ -135,7 +176,7 @@ def find_by_account_id(cls, account_id: str, crown_charge: bool = True):
for party in party_list:
party_codes.append(party.json)
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_account_id exception: ' + repr(db_exception))
current_app.logger.error('DB find_by_account_id exception: ' + str(db_exception))
raise DatabaseException(db_exception)
return party_codes

Expand All @@ -146,21 +187,17 @@ def find_by_branch_start(cls, branch_code: str):
if not branch_code or not branch_code.strip().isdigit():
return party_codes
try:
query_num = branch_code.strip()
if int(query_num) < 100: # Exact match
party = cls.find_by_code(query_num)
if party:
party_codes.append(party)
return party_codes

query_num += '%'
party_list = db.session.query(ClientCode).filter(db.cast(ClientCode.id, db.String).like(query_num)).all()
query_num: int = int(branch_code.strip())
query_code = str(query_num) + '%'
current_app.logger.debug(f'branch id matching on {query_code} from branch_code {branch_code}')
party_list = db.session.query(ClientCode).filter(db.cast(ClientCode.id, db.String)
.like(query_code)).order_by(ClientCode.id).all()
if not party_list:
return party_codes
for party in party_list:
party_codes.append(party.json)
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_branch_start exception: ' + repr(db_exception))
current_app.logger.error('DB find_by_branch_start exception: ' + str(db_exception))
raise DatabaseException(db_exception)

return party_codes
Expand All @@ -184,7 +221,7 @@ def find_by_head_office_name(cls, head_office_name: str = None, is_fuzzy_search:
for party in party_list:
party_codes.append(party.json)
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_head_office_name exception: ' + repr(db_exception))
current_app.logger.error('DB find_by_head_office_name exception: ' + str(db_exception))
raise DatabaseException(db_exception)
return party_codes

Expand All @@ -195,6 +232,49 @@ def find_by_head_office(cls, name_or_id: str = None, is_fuzzy_search: bool = Fal
return cls.find_by_head_office_name(name_or_id, is_fuzzy_search)

if name_or_id and name_or_id.strip().isdigit():
return cls.find_by_branch_start(name_or_id)
if len(name_or_id) <= 4:
return cls.find_by_head_office_start(name_or_id)
return cls.find_by_code_start(name_or_id)

return cls.find_by_head_office_name(name_or_id)

@classmethod
def find_by_code_start(cls, party_code: str):
"""Return a list of client parties matching on branch ids that start with a query number."""
client_codes = []
if not party_code or not party_code.strip().isdigit():
return client_codes
try:
query_code: str = party_code.strip() + '%'
current_app.logger.debug(f'party code matching on {query_code}')
query = text(CLIENT_CODE_BRANCH_QUERY)
results = db.session.execute(query, {'query_val': query_code})
rows = results.fetchall()
if rows is not None:
for row in rows:
client_code = {
'code': str(row[0]),
'businessName': str(row[3]),
'contact': {
'name': str(row[4]) if row[4] else '',
'phoneNumber': str(row[5]) if row[5] else ''
},
'address': {
'street': str(row[8]) if row[8] else '',
'streetAdditional': str(row[9]) if row[9] else '',
'city': str(row[10]) if row[10] else '',
'region': str(row[11]) if row[11] else '',
'postalCode': str(row[12]) if row[12] else '',
'country': str(row[13]) if row[13] else ''
}
}
if row[6]:
client_code['contact']['areaCode'] = str(row[6])
if row[7]:
client_code['emailAddress'] = str(row[7])
client_codes.append(client_code)
except Exception as db_exception: # noqa: B902; return nicer error
current_app.logger.error('DB find_by_code_start exception: ' + str(db_exception))
raise DatabaseException(db_exception)

return client_codes
6 changes: 5 additions & 1 deletion ppr-api/src/ppr_api/models/party.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def json(self) -> dict: # pylint: disable=too-many-branches
party['partyId'] = self.id

if self.client_code and self.branch_id:
party['code'] = str(self.branch_id)
party['code'] = self.format_party_code()
if self.client_code.name:
party['businessName'] = self.client_code.name

Expand Down Expand Up @@ -168,6 +168,10 @@ def save(self):

return self.json

def format_party_code(self) -> str:
"""Return the client party code in the 8 character format padded with leading zeroes."""
return str(self.branch_id).strip().rjust(8, '0')

@property
def name(self) -> str:
"""Return the full name of the party for comparison."""
Expand Down
2 changes: 1 addition & 1 deletion ppr-api/src/ppr_api/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
Development release segment: .devN
"""

__version__ = '1.1.8' # pylint: disable=invalid-name
__version__ = '1.1.9' # pylint: disable=invalid-name
53 changes: 47 additions & 6 deletions ppr-api/tests/unit/models/test_client_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
]
# testdata pattern is ({description}, {results_size}, {search_value})
TEST_DATA_BRANCH_CODE = [
('No results exact', 0, '020'),
('No results exact', 0, '000'),
('No results start 3', 0, '998'),
('Results start 3', 4, '999'),
('No results start 4', 0, '9998'),
Expand All @@ -56,10 +56,21 @@
('Fuzzy name search exists', 4, 'rbc', True),
('Fuzzy name search does not exist', 0, 'xxx', True),
]
# testdata pattern is ({code}, {formatted_code})
TEST_DATA_FORMAT_CODE = [
(1, '00000001'),
(11, '00000011'),
(111, '00000111'),
(1111, '00001111'),
(11111, '00011111'),
(111111, '00111111'),
(1111111, '01111111'),
(11111111, '11111111')
]


@pytest.mark.parametrize('desc,exists,search_value', TEST_DATA_PARTY_CODE)
def test_find_party_code(session, desc, exists, search_value):
def test_find_by_code(session, desc, exists, search_value):
"""Assert that find client party by code contains all expected elements."""
party = ClientCode.find_by_code(search_value)
if exists:
Expand All @@ -81,7 +92,7 @@ def test_find_party_code(session, desc, exists, search_value):


@pytest.mark.parametrize('desc,results_size,search_value', TEST_DATA_BRANCH_CODE)
def test_find_by_branch_code(session, desc, results_size, search_value):
def test_find_by_branch_start(session, desc, results_size, search_value):
"""Assert that find client parties by branch code matching contains all expected elements."""
parties = ClientCode.find_by_branch_start(search_value)
if results_size > 0:
Expand All @@ -104,6 +115,30 @@ def test_find_by_branch_code(session, desc, results_size, search_value):
assert not parties


@pytest.mark.parametrize('desc,results_size,search_value', TEST_DATA_BRANCH_CODE)
def test_find_by_code_start(session, desc, results_size, search_value):
"""Assert that find client parties by party code start matching contains all expected elements."""
parties = ClientCode.find_by_code_start(search_value)
if results_size > 0:
assert parties
assert len(parties) >= results_size
for party in parties:
assert len(party['code']) == 8
assert party['businessName']
assert party['address']
assert party['address']['street']
assert party['address']['city']
assert party['address']['region']
assert party['address']['postalCode']
assert party['contact']
assert party['contact']['name']
assert party['contact']['areaCode']
assert party['contact']['phoneNumber']
assert party['emailAddress']
else:
assert not parties


@pytest.mark.parametrize('desc,account_id,results_size,crown_charge', TEST_DATA_ACCOUNT_NUMBER)
def test_find_by_account_id(session, desc, account_id, results_size, crown_charge):
"""Assert that find client parties by account id contains all expected elements."""
Expand Down Expand Up @@ -152,10 +187,17 @@ def test_find_by_head_office(session, desc, results_size, search_value, fuzzy_se
assert not parties


@pytest.mark.parametrize('code,formatted_code', TEST_DATA_FORMAT_CODE)
def test_format_party_code(session, code, formatted_code):
"""Assert that client party code formatting is as expected."""
party = ClientCode(id=code)
assert party.format_party_code() == formatted_code


def test_client_party_json(session):
"""Assert that the client party model renders to a json format correctly."""
party = ClientCode(
id=1000,
id=10001,
name='BUSINESS NAME',
contact_name='CONTACT',
contact_area_cd='250',
Expand All @@ -164,7 +206,7 @@ def test_client_party_json(session):
)

party_json = {
'code': '1000',
'code': party.format_party_code(),
'businessName': party.name,
'contact': {
'name': party.contact_name,
Expand All @@ -173,5 +215,4 @@ def test_client_party_json(session):
},
'emailAddress': party.email_id
}

assert party.json == party_json
Loading