From 94e5bdc824ed00cb7b9cf4289f1814f5e1766e0d Mon Sep 17 00:00:00 2001 From: muzzynine Date: Wed, 22 Jan 2025 17:51:03 +0900 Subject: [PATCH] Fix URL deduplication issue when using the index --- src/datatrove/pipeline/dedup/url_dedup.py | 16 ++++++++----- tests/pipeline/test_url_deduplication.py | 29 +++++++++++++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/datatrove/pipeline/dedup/url_dedup.py b/src/datatrove/pipeline/dedup/url_dedup.py index 4a9503c2..f5ab3a31 100644 --- a/src/datatrove/pipeline/dedup/url_dedup.py +++ b/src/datatrove/pipeline/dedup/url_dedup.py @@ -51,7 +51,7 @@ class HashSig: file_stem: str def is_from_index(self): - return self.doc_id == -1 and self.priority == 1 + return self.doc_id == -1 and self.priority == 65536 def __lt__(self, other: "HashSig") -> bool: # Ensure that highest priority is always first of the hashes @@ -168,7 +168,7 @@ def read_sigs( assert last is None or data[0] >= last, f"Hash order error. {f.tell()=}, {data[0]=}, {last=}" last = data[0] yield ( - HashSig(hash_value=data[0], doc_id=-1, file_id=file_id, priority=-1, file_stem=file_stem) + HashSig(hash_value=data[0], doc_id=-1, file_id=file_id, priority=65536, file_stem=file_stem) if index_file else HashSig( file_id=file_id, @@ -248,7 +248,7 @@ def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1 index_file=True, lines_to_buffer=self.lines_to_buffer, ) - for file_i, file in enumerate(self.data_folder.open_files(index_files)) + for file_i, file in enumerate(self.index_folder.open_files(index_files)) ] ) @@ -271,12 +271,15 @@ def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1 packer = struct.Struct("