Skip to content

Commit

Permalink
more cleanup and documentatiion
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Dec 5, 2023
1 parent 5a84436 commit 14fb055
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 40 deletions.
2 changes: 1 addition & 1 deletion examples/node/README
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Demonstrates Adjacency List Relationships

- https://docs.sqlalchemy.org/en/14/orm/self_referential.html
- https://docs.sqlalchemy.org/en/20/orm/self_referential.html
45 changes: 15 additions & 30 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,13 @@ def _can_create_replication_slot(self, slot_name: str) -> None:

try:
self.create_replication_slot(slot_name)

except Exception as e:
logger.exception(f"{e}")
raise ReplicationSlotError(
f'PG_USER "{self.engine.url.username}" needs to be '
f"superuser or have permission to read, create and destroy "
f"replication slots to perform this action."
f"replication slots to perform this action.\n{e}"
)
else:
self.drop_replication_slot(slot_name)
Expand Down Expand Up @@ -394,26 +395,28 @@ def create_replication_slot(self, slot_name: str) -> None:
SELECT * FROM PG_REPLICATION_SLOTS
"""
logger.debug(f"Creating replication slot: {slot_name}")
return self.fetchone(
sa.select("*").select_from(
sa.func.PG_CREATE_LOGICAL_REPLICATION_SLOT(
slot_name,
PLUGIN,
try:
self.execute(
sa.select("*").select_from(
sa.func.PG_CREATE_LOGICAL_REPLICATION_SLOT(
slot_name,
PLUGIN,
)
)
),
label="create_replication_slot",
)
)
except Exception as e:
logger.exception(f"{e}")
raise

def drop_replication_slot(self, slot_name: str) -> None:
"""Drop a replication slot."""
logger.debug(f"Dropping replication slot: {slot_name}")
if self.replication_slots(slot_name):
try:
return self.fetchone(
self.execute(
sa.select("*").select_from(
sa.func.PG_DROP_REPLICATION_SLOT(slot_name),
),
label="drop_replication_slot",
)
)
except Exception as e:
logger.exception(f"{e}")
Expand Down Expand Up @@ -841,15 +844,6 @@ def fetchone(
with self.engine.connect() as conn:
return conn.execute(statement).fetchone()

# conn = self.engine.connect()
# try:
# row = conn.execute(statement).fetchone()
# conn.close()
# except Exception as e:
# logger.exception(f"Exception {e}")
# raise
# return row

def fetchall(
self,
statement: sa.sql.Select,
Expand All @@ -863,15 +857,6 @@ def fetchall(
with self.engine.connect() as conn:
return conn.execute(statement).fetchall()

# conn = self.engine.connect()
# try:
# rows = conn.execute(statement).fetchall()
# conn.close()
# except Exception as e:
# logger.exception(f"Exception {e}")
# raise
# return rows

def fetchmany(
self,
statement: sa.sql.Select,
Expand Down
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#
async-timeout==4.0.3
# via redis
boto3==1.33.6
boto3==1.33.8
# via -r requirements/base.in
botocore==1.33.6
botocore==1.33.8
# via
# boto3
# s3transfer
Expand Down
8 changes: 4 additions & 4 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ async-timeout==4.0.3
# via redis
black==23.11.0
# via -r requirements/dev.in
boto3==1.33.6
boto3==1.33.8
# via -r requirements/base.in
botocore==1.33.6
botocore==1.33.8
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -51,7 +51,7 @@ filelock==3.13.1
# via virtualenv
flake8==6.1.0
# via -r requirements/dev.in
freezegun==1.3.0
freezegun==1.3.1
# via -r requirements/dev.in
greenlet==3.0.1
# via sqlalchemy
Expand Down Expand Up @@ -88,7 +88,7 @@ packaging==23.2
# pytest
pathspec==0.11.2
# via black
platformdirs==4.0.0
platformdirs==4.1.0
# via
# black
# virtualenv
Expand Down
4 changes: 1 addition & 3 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,7 @@ def test_replication_slots(self, connection):
@patch("pgsync.base.logger")
def test_create_replication_slot(self, mock_logger, connection):
pg_base = Base(connection.engine.url.database)
row = pg_base.create_replication_slot("slot_name")
assert row[0] == "slot_name"
assert row[1] is not None
pg_base.create_replication_slot("slot_name")
pg_base.drop_replication_slot("slot_name")
calls = [
call("Creating replication slot: slot_name"),
Expand Down

0 comments on commit 14fb055

Please sign in to comment.