Skip to content

Commit

Permalink
use mapped_columns in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Dec 27, 2023
1 parent c5ee38e commit 1b3f052
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 101 deletions.
40 changes: 20 additions & 20 deletions bin/parallel_sync
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def save_ctid(page: int, row: int, filename: str) -> None:
row (int): The row number to save.
filename (str): The name of the file to save the checkpoint in.
"""
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid")
with open(checkpoint_file, "w+") as fp:
filepath: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid")
with open(filepath, "w+") as fp:
fp.write(f"{page},{row}\n")


Expand All @@ -83,12 +83,12 @@ def read_ctid(filename: str) -> t.Tuple[t.Optional[int], t.Optional[int]]:
Returns:
tuple: A tuple containing the page and row numbers. If the checkpoint file does not exist, returns (None, None).
"""
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid")
if os.path.exists(checkpoint_file):
with open(checkpoint_file, "r") as fp:
filepath: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid")
if os.path.exists(filepath):
with open(filepath, "r") as fp:
pairs: str = fp.read().split()[0].split(",")
page = int(pairs[0])
row = int(pairs[1])
page: int = int(pairs[0])
row: int = int(pairs[1])
return page, row
return None, None

Expand Down Expand Up @@ -230,7 +230,7 @@ def synchronous(
def multithreaded(
tasks: t.Generator,
doc: dict,
nprocs: t.Optional[int] = None,
nthreads: t.Optional[int] = None,
verbose: bool = False,
validate: bool = False,
) -> None:
Expand All @@ -247,12 +247,12 @@ def multithreaded(
)
queue.task_done()

nprocs: int = nprocs or 1
nthreads: int = nthreads or 1
queue: Queue = Queue()
sync: Sync = Sync(doc, verbose=verbose, validate=validate)
sync.tree.build(sync.nodes)

for _ in range(nprocs):
for _ in range(nthreads):
thread: Thread = Thread(
target=worker,
args=(
Expand All @@ -273,13 +273,13 @@ def multithreaded(
def multiprocess(
tasks: t.Generator,
doc: dict,
nprocs: t.Optional[int] = None,
ncpus: t.Optional[int] = None,
verbose: bool = False,
validate: bool = False,
) -> None:
sys.stdout.write("Multiprocess\n")
task: Task = Task(doc, verbose=verbose, validate=validate)
with ProcessPoolExecutor(max_workers=nprocs) as executor:
with ProcessPoolExecutor(max_workers=ncpus) as executor:
try:
list(executor.map(task.process, tasks))
except Exception as e:
Expand All @@ -291,12 +291,12 @@ def multiprocess(
def multithreaded_async(
tasks: t.Generator,
doc: dict,
nprocs: t.Optional[int] = None,
nthreads: t.Optional[int] = None,
verbose: bool = False,
validate: bool = False,
) -> None:
sys.stdout.write("Multi-threaded async\n")
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=nprocs)
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=nthreads)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(
run_tasks(executor, tasks, doc, verbose=verbose, validate=validate)
Expand All @@ -308,12 +308,12 @@ def multithreaded_async(
def multiprocess_async(
tasks: t.Generator,
doc: dict,
nprocs: t.Optional[int] = None,
ncpus: t.Optional[int] = None,
verbose: bool = False,
validate: bool = False,
) -> None:
sys.stdout.write("Multi-process async\n")
executor: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=nprocs)
executor: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=ncpus)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
Expand Down Expand Up @@ -428,15 +428,15 @@ def main(config, nprocs, mode, verbose):
if mode == "synchronous":
synchronous(tasks, document, verbose=verbose)
elif mode == "multithreaded":
multithreaded(tasks, document, nprocs=nprocs, verbose=verbose)
multithreaded(tasks, document, nthreads=nprocs, verbose=verbose)
elif mode == "multiprocess":
multiprocess(tasks, document, nprocs=nprocs, verbose=verbose)
multiprocess(tasks, document, ncpus=nprocs, verbose=verbose)
elif mode == "multithreaded_async":
multithreaded_async(
tasks, document, nprocs=nprocs, verbose=verbose
tasks, document, nthreads=nprocs, verbose=verbose
)
elif mode == "multiprocess_async":
multiprocess_async(tasks, document, nprocs=nprocs, verbose=verbose)
multiprocess_async(tasks, document, ncpus=nprocs, verbose=verbose)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#
async-timeout==4.0.3
# via redis
boto3==1.34.7
boto3==1.34.8
# via -r requirements/base.in
botocore==1.34.7
botocore==1.34.8
# via
# boto3
# s3transfer
Expand Down
4 changes: 2 additions & 2 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ async-timeout==4.0.3
# via redis
black==23.12.1
# via -r requirements/dev.in
boto3==1.34.7
boto3==1.34.8
# via -r requirements/base.in
botocore==1.34.7
botocore==1.34.8
# via
# boto3
# s3transfer
Expand Down
Loading

0 comments on commit 1b3f052

Please sign in to comment.