Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MinhashBuildIndex fails with StopIteration when initializing priority queue #333

Open
nelson-liu opened this issue Jan 30, 2025 · 1 comment · May be fixed by #334
Open

MinhashBuildIndex fails with StopIteration when initializing priority queue #333

nelson-liu opened this issue Jan 30, 2025 · 1 comment · May be fixed by #334

Comments

@nelson-liu
Copy link

I get this error when running a simple pipeline with MinhashBuildIndex:

 File "/tmp/ray/session_2025-01-28_00-11-53_869524_544/runtime_resources/pip/423edced06de87e59e89d62d04d06ae3a96700c1/virtualenv/lib/python3.11/site-packages/datatrove/executor/base.py", line 109, in _run_for_rank
 raise e
 File "/tmp/ray/session_2025-01-28_00-11-53_869524_544/runtime_resources/pip/423edced06de87e59e89d62d04d06ae3a96700c1/virtualenv/lib/python3.11/site-packages/datatrove/executor/base.py", line 90, in _run_for_rank
 pipelined_data = pipeline_step(pipelined_data, rank, self.world_size)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/tmp/ray/session_2025-01-28_00-11-53_869524_544/runtime_resources/pip/423edced06de87e59e89d62d04d06ae3a96700c1/virtualenv/lib/python3.11/site-packages/datatrove/pipeline/base.py", line 119, in __call__
 return self.run(data, rank, world_size)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/tmp/ray/session_2025-01-28_00-11-53_869524_544/runtime_resources/pip/423edced06de87e59e89d62d04d06ae3a96700c1/virtualenv/lib/python3.11/site-packages/datatrove/pipeline/dedup/minhash.py", line 679, in run
 pq = [next(sig_reader) for sig_reader in sig_readers]
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/tmp/ray/session_2025-01-28_00-11-53_869524_544/runtime_resources/pip/423edced06de87e59e89d62d04d06ae3a96700c1/virtualenv/lib/python3.11/site-packages/datatrove/pipeline/dedup/minhash.py", line 679, in <listcomp>
 pq = [next(sig_reader) for sig_reader in sig_readers]
 ^^^^^^^^^^^^^^^^
StopIteration
@nelson-liu
Copy link
Author

nelson-liu commented Jan 30, 2025

Though actually, it seems like index creation is more efficient as a byproduct of MinhashDedupBuckets? e.g., these two should be equivalent, but MinhashDedupBuckets is far more parallelized and thus could be faster, even though it has to identify the dups:

stage2 = RayPipelineExecutor(
        pipeline=[
            MinhashDedupBuckets(
                input_folder=f"{minhash_base_path}/signatures",
                output_folder=f"{minhash_base_path}/buckets",
                index_folder=f"{minhash_base_path}/index",
                create_index_name="fineweb-edu-index",
                only_dedup_in_index=True,
                config=minhash_config,
            ),
        ],
        tasks=minhash_config.num_buckets * 50,
        randomize_start_duration=180,
        logging_dir=f"{minhash_logs_path}/index",
        depends=stage1,
        memory_bytes_per_task=16 * 1024 * 1024 * 1024,
    )
stage2 = RayPipelineExecutor(
        pipeline=[
            MinhashBuildIndex(
                input_folder=f"{minhash_base_path}/signatures",
                output_folder=f"{minhash_base_path}/index",
                index_name="fineweb-edu-index",
                config=minhash_config,
                lines_to_buffer=1000,
            ),
        ],
        tasks=minhash_config.num_buckets,
        memory_bytes_per_task=16 * 1024 * 1024 * 1024,
        logging_dir=f"{minhash_logs_path}/index",
        depends=stage1,
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant