Skip to content

Commit

Permalink
Merge pull request #19 from 8848digital/connection_pool_postgres_v2
Browse files Browse the repository at this point in the history
Connection pool postgres v2
  • Loading branch information
aasif-patel authored Oct 18, 2024
2 parents 7770d62 + 1ad6f97 commit 5363dba
Showing 1 changed file with 58 additions and 3 deletions.
61 changes: 58 additions & 3 deletions frappe/database/postgres/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import re

import psycopg2
from psycopg2 import pool
import threading
import psycopg2.extensions
from psycopg2.errorcodes import (
CLASS_INTEGRITY_CONSTRAINT_VIOLATION,
Expand Down Expand Up @@ -121,6 +123,50 @@ def is_db_table_size_limit(e) -> bool:
def is_interface_error(e):
return isinstance(e, InterfaceError)

class ConnectionPool:
_connection_pool = None
_lock = threading.Lock()

@classmethod
def _initialize(cls, conn_settings = {}):
if not cls._connection_pool:
with cls._lock:
if cls._connection_pool is None and conn_settings:
print("Initializing Connection Pool")
cls._connection_pool = pool.ThreadedConnectionPool(
minconn=5,
maxconn=100,
**conn_settings
)
return cls._connection_pool

@classmethod
def get_connection(cls, conn_settings = {}):
if not cls._connection_pool:
cls._initialize(conn_settings)
try:
conn = cls._connection_pool.getconn()
except Exception as e:
conn = psycopg2.connect(**conn_settings)
return conn

@classmethod
def put_connection(cls, conn):
try:
cls._connection_pool.putconn(conn)
except Exception as e:
conn.close()

@classmethod
def get_connection_pool(cls, conn_settings = {}):
if not cls._connection_pool:
cls._initialize(conn_settings)
return cls._connection_pool

@classmethod
def close_all_connections(cls):
if cls._connection_pool:
cls._connection_pool.closeall()

class PostgresDatabase(PostgresExceptionUtil, Database):
REGEX_CHARACTER = "~"
Expand Down Expand Up @@ -168,6 +214,16 @@ def setup_type_map(self):
@property
def last_query(self):
return LazyDecode(self._cursor.query)

def close(self):
"""Close database connection."""
if self._conn:
ConnectionPool.put_connection(self._conn)
self._cursor = None
self._conn = None

def close_all_connections(self):
ConnectionPool.close_all_connections()

def get_connection(self):
conn_settings = {
Expand All @@ -178,10 +234,9 @@ def get_connection(self):
}
if self.port:
conn_settings["port"] = self.port

conn = psycopg2.connect(**conn_settings)
conn = ConnectionPool.get_connection(conn_settings)
conn.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)

return conn

def set_execution_timeout(self, seconds: int):
Expand Down

0 comments on commit 5363dba

Please sign in to comment.