diff --git a/bin/parallel_sync b/bin/parallel_sync index 072b5f88..0a9236fe 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -68,8 +68,8 @@ def save_ctid(page: int, row: int, filename: str) -> None: row (int): The row number to save. filename (str): The name of the file to save the checkpoint in. """ - checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid") - with open(checkpoint_file, "w+") as fp: + filepath: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid") + with open(filepath, "w+") as fp: fp.write(f"{page},{row}\n") @@ -83,12 +83,12 @@ def read_ctid(filename: str) -> t.Tuple[t.Optional[int], t.Optional[int]]: Returns: tuple: A tuple containing the page and row numbers. If the checkpoint file does not exist, returns (None, None). """ - checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid") - if os.path.exists(checkpoint_file): - with open(checkpoint_file, "r") as fp: + filepath: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid") + if os.path.exists(filepath): + with open(filepath, "r") as fp: pairs: str = fp.read().split()[0].split(",") - page = int(pairs[0]) - row = int(pairs[1]) + page: int = int(pairs[0]) + row: int = int(pairs[1]) return page, row return None, None @@ -230,7 +230,7 @@ def synchronous( def multithreaded( tasks: t.Generator, doc: dict, - nprocs: t.Optional[int] = None, + nthreads: t.Optional[int] = None, verbose: bool = False, validate: bool = False, ) -> None: @@ -247,12 +247,12 @@ def multithreaded( ) queue.task_done() - nprocs: int = nprocs or 1 + nthreads: int = nthreads or 1 queue: Queue = Queue() sync: Sync = Sync(doc, verbose=verbose, validate=validate) sync.tree.build(sync.nodes) - for _ in range(nprocs): + for _ in range(nthreads): thread: Thread = Thread( target=worker, args=( @@ -273,13 +273,13 @@ def multithreaded( def multiprocess( tasks: t.Generator, doc: dict, - nprocs: t.Optional[int] = None, + ncpus: t.Optional[int] = None, verbose: bool = False, validate: bool = False, ) -> None: sys.stdout.write("Multiprocess\n") task: Task = Task(doc, verbose=verbose, validate=validate) - with ProcessPoolExecutor(max_workers=nprocs) as executor: + with ProcessPoolExecutor(max_workers=ncpus) as executor: try: list(executor.map(task.process, tasks)) except Exception as e: @@ -291,12 +291,12 @@ def multiprocess( def multithreaded_async( tasks: t.Generator, doc: dict, - nprocs: t.Optional[int] = None, + nthreads: t.Optional[int] = None, verbose: bool = False, validate: bool = False, ) -> None: sys.stdout.write("Multi-threaded async\n") - executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=nprocs) + executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=nthreads) event_loop = asyncio.get_event_loop() event_loop.run_until_complete( run_tasks(executor, tasks, doc, verbose=verbose, validate=validate) @@ -308,12 +308,12 @@ def multithreaded_async( def multiprocess_async( tasks: t.Generator, doc: dict, - nprocs: t.Optional[int] = None, + ncpus: t.Optional[int] = None, verbose: bool = False, validate: bool = False, ) -> None: sys.stdout.write("Multi-process async\n") - executor: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=nprocs) + executor: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=ncpus) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( @@ -428,15 +428,15 @@ def main(config, nprocs, mode, verbose): if mode == "synchronous": synchronous(tasks, document, verbose=verbose) elif mode == "multithreaded": - multithreaded(tasks, document, nprocs=nprocs, verbose=verbose) + multithreaded(tasks, document, nthreads=nprocs, verbose=verbose) elif mode == "multiprocess": - multiprocess(tasks, document, nprocs=nprocs, verbose=verbose) + multiprocess(tasks, document, ncpus=nprocs, verbose=verbose) elif mode == "multithreaded_async": multithreaded_async( - tasks, document, nprocs=nprocs, verbose=verbose + tasks, document, nthreads=nprocs, verbose=verbose ) elif mode == "multiprocess_async": - multiprocess_async(tasks, document, nprocs=nprocs, verbose=verbose) + multiprocess_async(tasks, document, ncpus=nprocs, verbose=verbose) if __name__ == "__main__": diff --git a/requirements/base.txt b/requirements/base.txt index 08db6c7f..008b3af5 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -6,9 +6,9 @@ # async-timeout==4.0.3 # via redis -boto3==1.34.7 +boto3==1.34.8 # via -r requirements/base.in -botocore==1.34.7 +botocore==1.34.8 # via # boto3 # s3transfer diff --git a/requirements/dev.txt b/requirements/dev.txt index b6089e17..17334399 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -8,9 +8,9 @@ async-timeout==4.0.3 # via redis black==23.12.1 # via -r requirements/dev.in -boto3==1.34.7 +boto3==1.34.8 # via -r requirements/base.in -botocore==1.34.7 +botocore==1.34.8 # via # boto3 # s3transfer diff --git a/tests/conftest.py b/tests/conftest.py index ea019f22..96971017 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ import pytest import sqlalchemy as sa -from sqlalchemy.orm import DeclarativeBase, sessionmaker +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker from sqlalchemy.schema import UniqueConstraint from pgsync.base import Base, create_database, drop_database @@ -99,10 +99,12 @@ def city_cls(base, country_cls): class City(base): __tablename__ = "city" __table_args__ = (UniqueConstraint("name", "country_id"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) - country_id = sa.Column(sa.Integer, sa.ForeignKey(country_cls.id)) - country = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) + country_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(country_cls.id) + ) + country: Mapped[country_cls] = sa.orm.relationship( country_cls, backref=sa.orm.backref("countries") ) @@ -114,10 +116,12 @@ def country_cls(base, continent_cls): class Country(base): __tablename__ = "country" __table_args__ = (UniqueConstraint("name", "continent_id"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) - continent_id = sa.Column(sa.Integer, sa.ForeignKey(continent_cls.id)) - continent = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) + continent_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(continent_cls.id) + ) + continent: Mapped[continent_cls] = sa.orm.relationship( continent_cls, backref=sa.orm.backref("continents"), ) @@ -130,8 +134,8 @@ def continent_cls(base): class Continent(base): __tablename__ = "continent" __table_args__ = (UniqueConstraint("name"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) return Continent @@ -141,8 +145,8 @@ def publisher_cls(base): class Publisher(base): __tablename__ = "publisher" __table_args__ = (UniqueConstraint("name"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) return Publisher @@ -152,11 +156,15 @@ def author_cls(base, city_cls): class Author(base): __tablename__ = "author" __table_args__ = (UniqueConstraint("name"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) - birth_year = sa.Column(sa.Integer, nullable=True) - city_id = sa.Column(sa.Integer, sa.ForeignKey(city_cls.id)) - city = sa.orm.relationship(city_cls, backref=sa.orm.backref("city")) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) + birth_year: Mapped[int] = mapped_column(sa.Integer, nullable=True) + city_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(city_cls.id) + ) + city: Mapped[city_cls] = sa.orm.relationship( + city_cls, backref=sa.orm.backref("city") + ) return Author @@ -166,8 +174,8 @@ def shelf_cls(base): class Shelf(base): __tablename__ = "shelf" __table_args__ = (UniqueConstraint("shelf"),) - id = sa.Column(sa.Integer, primary_key=True) - shelf = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + shelf: Mapped[str] = mapped_column(sa.String, nullable=False) return Shelf @@ -177,8 +185,8 @@ def subject_cls(base): class Subject(base): __tablename__ = "subject" __table_args__ = (UniqueConstraint("name"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) return Subject @@ -188,8 +196,8 @@ def language_cls(base): class Language(base): __tablename__ = "language" __table_args__ = (UniqueConstraint("code"),) - id = sa.Column(sa.Integer, primary_key=True) - code = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + code: Mapped[str] = mapped_column(sa.String, nullable=False) return Language @@ -199,8 +207,8 @@ def contact_cls(base): class Contact(base): __tablename__ = "contact" __table_args__ = (UniqueConstraint("name"),) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) return Contact @@ -213,10 +221,12 @@ class ContactItem(base): UniqueConstraint("name"), UniqueConstraint("contact_id"), ) - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.String, nullable=False) - contact_id = sa.Column(sa.Integer, sa.ForeignKey(contact_cls.id)) - contact = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + name: Mapped[str] = mapped_column(sa.String, nullable=False) + contact_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(contact_cls.id) + ) + contact: Mapped[contact_cls] = sa.orm.relationship( contact_cls, backref=sa.orm.backref("contacts") ) @@ -231,10 +241,14 @@ class User(base): UniqueConstraint("name"), UniqueConstraint("contact_id"), ) - id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) - name = sa.Column(sa.String, nullable=False) - contact_id = sa.Column(sa.Integer, sa.ForeignKey(contact_cls.id)) - contact = sa.orm.relationship( + id: Mapped[int] = mapped_column( + sa.Integer, primary_key=True, autoincrement=True + ) + name: Mapped[str] = mapped_column(sa.String, nullable=False) + contact_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(contact_cls.id) + ) + contact: Mapped[contact_cls] = sa.orm.relationship( contact_cls, backref=sa.orm.backref("user_contacts") ) @@ -246,25 +260,35 @@ def book_cls(base, publisher_cls, user_cls): class Book(base): __tablename__ = "book" __table_args__ = (UniqueConstraint("isbn"),) - isbn = sa.Column(sa.String, primary_key=True) - title = sa.Column(sa.String, nullable=False) - description = sa.Column(sa.String, nullable=True) - copyright = sa.Column(sa.String, nullable=True) - publisher_id = sa.Column(sa.Integer, sa.ForeignKey(publisher_cls.id)) - publisher = sa.orm.relationship( + isbn: Mapped[str] = mapped_column(sa.String, primary_key=True) + title: Mapped[str] = mapped_column(sa.String, nullable=False) + description: Mapped[str] = mapped_column(sa.String, nullable=True) + copyright: Mapped[str] = mapped_column(sa.String, nullable=True) + publisher_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(publisher_cls.id), nullable=True + ) + publisher: Mapped[publisher_cls] = sa.orm.relationship( publisher_cls, backref=sa.orm.backref("publishers") ) - buyer_id = sa.Column(sa.Integer, sa.ForeignKey(user_cls.id)) - buyer = sa.orm.relationship( - user_cls, backref=sa.orm.backref("buyers"), foreign_keys=[buyer_id] + buyer_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(user_cls.id), nullable=True + ) + buyer: Mapped[user_cls] = sa.orm.relationship( + user_cls, + backref=sa.orm.backref("buyers"), + foreign_keys=[buyer_id], + ) + seller_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(user_cls.id), nullable=True ) - seller_id = sa.Column(sa.Integer, sa.ForeignKey(user_cls.id)) - seller = sa.orm.relationship( + seller: Mapped[user_cls] = sa.orm.relationship( user_cls, backref=sa.orm.backref("sellers"), foreign_keys=[seller_id], ) - tags = sa.Column(sa.dialects.postgresql.JSONB, nullable=True) + tags: Mapped[sa.dialects.postgresql.JSONB] = mapped_column( + sa.dialects.postgresql.JSONB, nullable=True + ) return Book @@ -274,13 +298,17 @@ def book_author_cls(base, book_cls, author_cls): class BookAuthor(base): __tablename__ = "book_author" __table_args__ = (UniqueConstraint("book_isbn", "author_id"),) - id = sa.Column(sa.Integer, primary_key=True) - book_isbn = sa.Column(sa.String, sa.ForeignKey(book_cls.isbn)) - book = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + book_isbn: Mapped[str] = mapped_column( + sa.String, sa.ForeignKey(book_cls.isbn) + ) + book: Mapped[book_cls] = sa.orm.relationship( book_cls, backref=sa.orm.backref("book_author_books") ) - author_id = sa.Column(sa.Integer, sa.ForeignKey(author_cls.id)) - author = sa.orm.relationship( + author_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(author_cls.id) + ) + author: Mapped[author_cls] = sa.orm.relationship( author_cls, backref=sa.orm.backref("authors") ) @@ -292,13 +320,17 @@ def book_subject_cls(base, book_cls, subject_cls): class BookSubject(base): __tablename__ = "book_subject" __table_args__ = (UniqueConstraint("book_isbn", "subject_id"),) - id = sa.Column(sa.Integer, primary_key=True) - book_isbn = sa.Column(sa.String, sa.ForeignKey(book_cls.isbn)) - book = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + book_isbn: Mapped[str] = mapped_column( + sa.String, sa.ForeignKey(book_cls.isbn) + ) + book: Mapped[book_cls] = sa.orm.relationship( book_cls, backref=sa.orm.backref("book_subject_books") ) - subject_id = sa.Column(sa.Integer, sa.ForeignKey(subject_cls.id)) - subject = sa.orm.relationship( + subject_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(subject_cls.id) + ) + subject: Mapped[subject_cls] = sa.orm.relationship( subject_cls, backref=sa.orm.backref("subjects") ) @@ -310,13 +342,17 @@ def book_language_cls(base, book_cls, language_cls): class BookLanguage(base): __tablename__ = "book_language" __table_args__ = (UniqueConstraint("book_isbn", "language_id"),) - id = sa.Column(sa.Integer, primary_key=True) - book_isbn = sa.Column(sa.String, sa.ForeignKey(book_cls.isbn)) - book = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + book_isbn: Mapped[str] = mapped_column( + sa.String, sa.ForeignKey(book_cls.isbn) + ) + book: Mapped[book_cls] = sa.orm.relationship( book_cls, backref=sa.orm.backref("book_language_books") ) - language_id = sa.Column(sa.Integer, sa.ForeignKey(language_cls.id)) - language = sa.orm.relationship( + language_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(language_cls.id) + ) + language: Mapped[language_cls] = sa.orm.relationship( language_cls, backref=sa.orm.backref("languages") ) @@ -328,13 +364,17 @@ def book_shelf_cls(base, book_cls, shelf_cls): class BookShelf(base): __tablename__ = "book_shelf" __table_args__ = (UniqueConstraint("book_isbn", "shelf_id"),) - id = sa.Column(sa.Integer, primary_key=True) - book_isbn = sa.Column(sa.String, sa.ForeignKey(book_cls.isbn)) - book = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + book_isbn: Mapped[str] = mapped_column( + sa.String, sa.ForeignKey(book_cls.isbn) + ) + book: Mapped[book_cls] = sa.orm.relationship( book_cls, backref=sa.orm.backref("book_book_shelf_books") ) - shelf_id = sa.Column(sa.Integer, sa.ForeignKey(shelf_cls.id)) - shelf = sa.orm.relationship( + shelf_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(shelf_cls.id) + ) + shelf: Mapped[shelf_cls] = sa.orm.relationship( shelf_cls, backref=sa.orm.backref("shelves") ) @@ -346,12 +386,14 @@ def rating_cls(base, book_cls): class Rating(base): __tablename__ = "rating" __table_args__ = (UniqueConstraint("book_isbn"),) - id = sa.Column(sa.Integer, primary_key=True) - book_isbn = sa.Column(sa.String, sa.ForeignKey(book_cls.isbn)) - book = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + book_isbn: Mapped[str] = mapped_column( + sa.String, sa.ForeignKey(book_cls.isbn) + ) + book: Mapped[book_cls] = sa.orm.relationship( book_cls, backref=sa.orm.backref("book_rating_books") ) - value = sa.Column(sa.Float, nullable=True) + value: Mapped[float] = mapped_column(sa.Float, nullable=True) return Rating @@ -361,8 +403,8 @@ def group_cls(base): class Group(base): __tablename__ = "group" __table_args__ = (UniqueConstraint("group_name"),) - id = sa.Column(sa.Integer, primary_key=True) - group_name = sa.Column(sa.String, nullable=False) + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + group_name: Mapped[str] = mapped_column(sa.String, nullable=False) return Group @@ -372,13 +414,17 @@ def book_group_cls(base, book_cls, group_cls): class BookGroup(base): __tablename__ = "book_group" __table_args__ = (UniqueConstraint("book_isbn", "group_id"),) - id = sa.Column(sa.Integer, primary_key=True) - book_isbn = sa.Column(sa.String, sa.ForeignKey(book_cls.isbn)) - book = sa.orm.relationship( + id: Mapped[int] = mapped_column(sa.Integer, primary_key=True) + book_isbn: Mapped[str] = mapped_column( + sa.String, sa.ForeignKey(book_cls.isbn) + ) + book: Mapped[book_cls] = sa.orm.relationship( book_cls, backref=sa.orm.backref("book_book_group_books") ) - group_id = sa.Column(sa.Integer, sa.ForeignKey(group_cls.id)) - group = sa.orm.relationship( + group_id: Mapped[int] = mapped_column( + sa.Integer, sa.ForeignKey(group_cls.id) + ) + group: Mapped[group_cls] = sa.orm.relationship( group_cls, backref=sa.orm.backref("groups"), cascade="all,delete" )