Skip to content

Commit

Permalink
move nthreads_polldb to class attr
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Dec 23, 2023
1 parent f771ff7 commit d9c445a
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
15 changes: 11 additions & 4 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
verbose: bool = False,
validate: bool = True,
repl_slots: bool = True,
nthreads_polldb: int = 1,
**kwargs,
) -> None:
"""Constructor."""
Expand All @@ -89,6 +90,7 @@ def __init__(
self._checkpoint: int = None
self._plugins: Plugins = None
self._truncate: bool = False
self.nthreads_polldb: int = nthreads_polldb
self._checkpoint_file: str = os.path.join(
settings.CHECKPOINT_PATH, f".{self.__name}"
)
Expand Down Expand Up @@ -1273,7 +1275,7 @@ def _status(self, label: str) -> None:
)
sys.stdout.flush()

def receive(self, nthreads_polldb: int) -> None:
def receive(self) -> None:
"""
Receive events from db.
Expand Down Expand Up @@ -1301,7 +1303,7 @@ def receive(self, nthreads_polldb: int) -> None:
else:
# start a background worker producer thread to poll the db and
# populate the Redis cache
for _ in range(nthreads_polldb):
for _ in range(self.nthreads_polldb):
self.poll_db()
# sync up to current transaction_id
self.pull()
Expand Down Expand Up @@ -1452,10 +1454,15 @@ def main(

else:
for document in config_loader(config):
sync: Sync = Sync(document, verbose=verbose, **kwargs)
sync: Sync = Sync(
document,
verbose=verbose,
nthreads_polldb=nthreads_polldb,
**kwargs,
)
sync.pull()
if daemon:
sync.receive(nthreads_polldb)
sync.receive()
sync.join()


Expand Down
7 changes: 3 additions & 4 deletions tests/test_sync_nested_children.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pytest

from pgsync.base import subtransactions
from pgsync.settings import NTHREADS_POLLDB
from pgsync.singleton import Singleton
from pgsync.sync import Sync

Expand Down Expand Up @@ -988,7 +987,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

txmin = sync.checkpoint
Expand Down Expand Up @@ -1234,7 +1233,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = [sort_list(doc) for doc in sync.sync()]
Expand Down Expand Up @@ -2244,7 +2243,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

txmin = sync.checkpoint
Expand Down
7 changes: 3 additions & 4 deletions tests/test_sync_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
NodeAttributeError,
TableNotInNodeError,
)
from pgsync.settings import NTHREADS_POLLDB
from pgsync.singleton import Singleton
from pgsync.sync import Sync

Expand Down Expand Up @@ -497,7 +496,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down Expand Up @@ -645,7 +644,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down Expand Up @@ -705,7 +704,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down
7 changes: 3 additions & 4 deletions tests/test_sync_single_child_fk_on_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import psycopg2
import pytest

from pgsync import settings
from pgsync.base import subtransactions
from pgsync.exc import (
ForeignKeyError,
Expand Down Expand Up @@ -770,7 +769,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(settings.NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down Expand Up @@ -1073,7 +1072,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(settings.NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down Expand Up @@ -1193,7 +1192,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(settings.NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down
7 changes: 3 additions & 4 deletions tests/test_sync_single_child_fk_on_parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
RelationshipVariantError,
)
from pgsync.node import Tree
from pgsync.settings import NTHREADS_POLLDB
from pgsync.singleton import Singleton
from pgsync.sync import Sync

Expand Down Expand Up @@ -773,7 +772,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down Expand Up @@ -1068,7 +1067,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down Expand Up @@ -1187,7 +1186,7 @@ def poll_db():
"pgsync.sync.Sync.status",
side_effect=noop,
):
sync.receive(NTHREADS_POLLDB)
sync.receive()
sync.search_client.refresh("testdb")

docs = search(sync.search_client, "testdb")
Expand Down

0 comments on commit d9c445a

Please sign in to comment.