Skip to content

Commit

Permalink
refactor: IP Blacklist
Browse files Browse the repository at this point in the history
  • Loading branch information
s-aga-r committed Jan 4, 2025
1 parent 9315376 commit fd809f8
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 0 deletions.
14 changes: 14 additions & 0 deletions mail/api/blacklist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import frappe
from frappe import _

from mail.mail.doctype.ip_blacklist.ip_blacklist import get_blacklist_for_ip_address


@frappe.whitelist(methods=["GET"], allow_guest=True)
def get(ip_address: str) -> dict:
"""Returns the blacklist for the given IP address."""

if not ip_address:
frappe.throw(_("IP address is required."), frappe.MandatoryError)

return get_blacklist_for_ip_address(ip_address, create_if_not_exists=True, commit=True) or {}
Empty file.
8 changes: 8 additions & 0 deletions mail/mail/doctype/ip_blacklist/ip_blacklist.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt

// frappe.ui.form.on("IP Blacklist", {
// refresh(frm) {

// },
// });
146 changes: 146 additions & 0 deletions mail/mail/doctype/ip_blacklist/ip_blacklist.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{
"actions": [],
"allow_rename": 1,
"autoname": "field:ip_address",
"creation": "2024-08-14 18:22:35.949979",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"is_blacklisted",
"section_break_vggx",
"ip_address",
"ip_version",
"ip_address_expanded",
"ip_group",
"column_break_0vb9",
"host",
"blacklist_reason",
"section_break_9zf8",
"source_ip_address",
"column_break_iwmw",
"source_host"
],
"fields": [
{
"default": "1",
"fieldname": "is_blacklisted",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Blacklisted",
"no_copy": 1,
"search_index": 1
},
{
"fieldname": "section_break_vggx",
"fieldtype": "Section Break"
},
{
"fieldname": "ip_address",
"fieldtype": "Data",
"in_standard_filter": 1,
"label": "IP Address",
"no_copy": 1,
"reqd": 1,
"set_only_once": 1,
"unique": 1
},
{
"fieldname": "ip_version",
"fieldtype": "Select",
"in_standard_filter": 1,
"label": "IP Version",
"no_copy": 1,
"options": "\nIPv4\nIPv6",
"read_only": 1,
"search_index": 1,
"set_only_once": 1
},
{
"fieldname": "ip_address_expanded",
"fieldtype": "Data",
"label": "IP Address Expanded",
"no_copy": 1,
"read_only": 1,
"set_only_once": 1,
"unique": 1
},
{
"fieldname": "ip_group",
"fieldtype": "Data",
"in_list_view": 1,
"in_standard_filter": 1,
"label": "IP Group",
"no_copy": 1,
"read_only": 1,
"search_index": 1
},
{
"fieldname": "column_break_0vb9",
"fieldtype": "Column Break"
},
{
"fieldname": "host",
"fieldtype": "Data",
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Host",
"no_copy": 1,
"read_only": 1
},
{
"fieldname": "blacklist_reason",
"fieldtype": "Small Text",
"label": "Blacklist Reason"
},
{
"fieldname": "section_break_9zf8",
"fieldtype": "Section Break"
},
{
"fieldname": "source_ip_address",
"fieldtype": "Data",
"label": "Source IP Address",
"no_copy": 1,
"read_only": 1,
"set_only_once": 1
},
{
"fieldname": "column_break_iwmw",
"fieldtype": "Column Break"
},
{
"fieldname": "source_host",
"fieldtype": "Data",
"label": "Source Host",
"no_copy": 1,
"read_only": 1,
"set_only_once": 1
}
],
"index_web_pages_for_search": 1,
"links": [],
"modified": "2024-10-18 18:48:41.941265",
"modified_by": "Administrator",
"module": "Mail",
"name": "IP Blacklist",
"naming_rule": "By fieldname",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"sort_field": "creation",
"sort_order": "DESC",
"states": [],
"track_changes": 1
}
131 changes: 131 additions & 0 deletions mail/mail/doctype/ip_blacklist/ip_blacklist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt

import ipaddress
from typing import Literal

import frappe
from frappe import _
from frappe.model.document import Document

from mail.utils import get_host_by_ip
from mail.utils.cache import delete_cache, get_blacklist_for_ip_group


class IPBlacklist(Document):
def validate(self) -> None:
if self.is_new():
self.set_ip_version()
self.set_ip_address_expanded()
self.set_ip_group()
self.set_source_ip_address()
self.set_source_host()

self.set_host()

def on_update(self) -> None:
delete_cache(f"blacklist|{self.ip_group}")

def set_ip_version(self) -> None:
"""Sets the IP version of the IP address"""

self.ip_version = get_ip_version(self.ip_address)

def set_ip_address_expanded(self) -> None:
"""Sets the expanded version of the IP address"""

self.ip_address_expanded = get_ip_address_expanded(self.ip_version, self.ip_address)

def set_ip_group(self) -> None:
"""Sets the IP group"""

self.ip_group = get_ip_group(self.ip_version, self.ip_address_expanded)

def set_source_ip_address(self) -> None:
"""Sets the source IP address"""

self.source_ip_address = frappe.local.request_ip

def set_source_host(self) -> None:
"""Sets the source host"""

self.source_host = get_host_by_ip(self.source_ip_address)

def set_host(self) -> None:
"""Sets the host for the IP address"""

self.host = get_host_by_ip(self.ip_address_expanded)


def get_ip_version(ip_address: str) -> Literal["IPv4", "IPv6"]:
"""Returns the IP version of the IP address"""

return "IPv6" if ":" in ip_address else "IPv4"


def get_ip_address_expanded(ip_version: Literal["IPv4", "IPv6"], ip_address: str) -> str:
"""Returns the expanded version of the IP address"""

return (
str(ipaddress.IPv6Address(ip_address).exploded)
if ip_version == "IPv6"
else str(ipaddress.IPv4Address(ip_address))
)


def get_ip_group(ip_version: Literal["IPv4", "IPv6"], ip_address: str) -> str:
"""Returns the IP group"""

if ip_version == "IPv6":
return ":".join(ip_address.split(":")[:3])
else:
return ".".join(ip_address.split(".")[:2])


def create_ip_blacklist(
ip_address: str, blacklist_reason: str | None = None, is_blacklisted: bool = True
) -> IPBlacklist | None:
"""Create an IP Blacklist document"""

try:
doc = frappe.new_doc("IP Blacklist")
doc.ip_address = ip_address
doc.blacklist_reason = blacklist_reason
doc.is_blacklisted = is_blacklisted
doc.insert(
ignore_permissions=True,
ignore_if_duplicate=True,
)
return doc
except Exception:
frappe.log_error(title=_("Error creating IP Blacklist"), message=frappe.get_traceback())


def get_blacklist_for_ip_address(
ip_address: str, create_if_not_exists: bool = False, commit: bool = False
) -> dict | None:
"""Returns the blacklist for the IP address"""

ip_version = get_ip_version(ip_address)
ip_address_expanded = get_ip_address_expanded(ip_version, ip_address)
ip_group = get_ip_group(ip_version, ip_address_expanded)

if blacklist_group := get_blacklist_for_ip_group(ip_group):
for blacklist in blacklist_group:
if blacklist["ip_address"] == ip_address:
return blacklist

if not create_if_not_exists:
return

if blacklist := create_ip_blacklist(ip_address, is_blacklisted=False):
if commit:
frappe.db.commit()

return {
"name": blacklist.name,
"is_blacklisted": blacklist.is_blacklisted,
"ip_address": blacklist.ip_address,
"ip_address_expanded": blacklist.ip_address_expanded,
"blacklist_reason": blacklist.blacklist_reason,
}
9 changes: 9 additions & 0 deletions mail/mail/doctype/ip_blacklist/test_ip_blacklist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt

# import frappe
from frappe.tests.utils import FrappeTestCase


class TestIPBlacklist(FrappeTestCase):
pass
20 changes: 20 additions & 0 deletions mail/utils/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,23 @@ def getter() -> str | None:
return frappe.db.get_value("Mailbox", {"user": user, "is_default": 1}, "name")

return _hget_or_hset(f"user|{user}", "default_mailbox", getter)


def get_blacklist_for_ip_group(ip_group: str) -> list:
"""Returns the blacklist for the IP group."""

def getter() -> list:
IP_BLACKLIST = frappe.qb.DocType("IP Blacklist")
return (
frappe.qb.from_(IP_BLACKLIST)
.select(
IP_BLACKLIST.name,
IP_BLACKLIST.is_blacklisted,
IP_BLACKLIST.ip_address,
IP_BLACKLIST.ip_address_expanded,
IP_BLACKLIST.blacklist_reason,
)
.where(IP_BLACKLIST.ip_group == ip_group)
).run(as_dict=True)

return _get_or_set(f"blacklist|{ip_group}", getter, expires_in_sec=24 * 60 * 60)

0 comments on commit fd809f8

Please sign in to comment.