Skip to content

Commit

Permalink
Fix URL deduplication issue when using the index
Browse files Browse the repository at this point in the history
  • Loading branch information
muzzynine committed Jan 22, 2025
1 parent 2260603 commit 94e5bdc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
16 changes: 10 additions & 6 deletions src/datatrove/pipeline/dedup/url_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class HashSig:
file_stem: str

def is_from_index(self):
return self.doc_id == -1 and self.priority == 1
return self.doc_id == -1 and self.priority == 65536

def __lt__(self, other: "HashSig") -> bool:
# Ensure that highest priority is always first of the hashes
Expand Down Expand Up @@ -168,7 +168,7 @@ def read_sigs(
assert last is None or data[0] >= last, f"Hash order error. {f.tell()=}, {data[0]=}, {last=}"
last = data[0]
yield (
HashSig(hash_value=data[0], doc_id=-1, file_id=file_id, priority=-1, file_stem=file_stem)
HashSig(hash_value=data[0], doc_id=-1, file_id=file_id, priority=65536, file_stem=file_stem)
if index_file
else HashSig(
file_id=file_id,
Expand Down Expand Up @@ -248,7 +248,7 @@ def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1
index_file=True,
lines_to_buffer=self.lines_to_buffer,
)
for file_i, file in enumerate(self.data_folder.open_files(index_files))
for file_i, file in enumerate(self.index_folder.open_files(index_files))
]
)

Expand All @@ -271,12 +271,15 @@ def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1
packer = struct.Struct("<I")
while pq:
v: HashSig = heapq.heappop(pq)

if last and last.hash_value == v.hash_value and not v.is_from_index():
out_filename = f"{rank:04d}/{v.file_stem}{ExtensionHelperSD.stage_2_duplicates}"
if not index_files or last.is_from_index() or not self.config.only_dedup_in_index:
doc_id_bytes = packer.pack(v.doc_id)
output_mg.write(out_filename, doc_id_bytes)
last = v

if not last or last.hash_value != v.hash_value or not last.is_from_index():
last = v
new_v = next(sig_readers[v.file_id], None)

if new_v:
Expand Down Expand Up @@ -390,7 +393,7 @@ def __init__(
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1):
assert world_size == 1, "UrlDedupBuildIndex can only run on a single worker."
with self.stats.time_stats:
sig_files = self.data_folder.list_files(glob_pattern=ExtensionHelperSD.stage_1_signature)
sig_files = self.data_folder.list_files(glob_pattern="*/*" + ExtensionHelperSD.stage_1_signature)
sig_readers = [
read_sigs(file, file_i, self.config.hash_config, lines_to_buffer=self.lines_to_buffer)
for file_i, file in enumerate(self.data_folder.open_files(sig_files))
Expand All @@ -399,10 +402,11 @@ def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1
pq = [next(sig_reader) for sig_reader in sig_readers]
heapq.heapify(pq)

with self.output_folder.open(f"{self.index_name}.{ExtensionHelperSD.index}", mode="wb") as out_f:
with self.output_folder.open(f"{self.index_name}{ExtensionHelperSD.index}", mode="wb") as out_f:
last = None
while pq:
v: HashSig = heapq.heappop(pq)

if last != v.hash_value:
out_f.write(struct.pack(f"<{self.config.hash_config.struct_format}", v.hash_value))
last = v.hash_value
Expand Down
29 changes: 29 additions & 0 deletions tests/pipeline/test_url_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datatrove.data import Document
from datatrove.pipeline.dedup.url_dedup import (
UrlDedupBuildIndex,
UrlDedupConfig,
UrlDedupFilter,
UrlDedupSignature,
Expand Down Expand Up @@ -109,6 +110,34 @@ def test_url_deduplication_with_normalization(self):
{"https://example.com", "https://new-site.com"},
)

def test_url_deduplication_with_index(self):
signature_creation = UrlDedupSignature(output_folder=self.tmp_dir + "/sigs")
index_signature_creation = UrlDedupSignature(output_folder=self.tmp_dir + "/index_sigs")
build_index = UrlDedupBuildIndex(
data_folder=self.tmp_dir + "/index_sigs",
output_folder=self.tmp_dir + "/index",
index_name="index",
lines_to_buffer=1000,
)
find_duplicates = UrlFindDedups(
data_folder=self.tmp_dir + "/sigs",
index_folder=self.tmp_dir + "/index",
output_folder=self.tmp_dir + "/dups",
lines_to_buffer=1000,
)
dedup_filter = UrlDedupFilter(data_folder=self.tmp_dir + "/dups")

index_signature_creation(data=DOCS[:1])
build_index()
signature_creation(data=DOCS[1:])
find_duplicates()
docs = list(dedup_filter(data=copy.deepcopy(DOCS[1:])))
self.assertEqual(len(docs), 2)
self.assertEqual(
{doc.metadata["url"] for doc in docs},
{doc.metadata["url"] for doc in DOCS[1:]} - {doc.metadata["url"] for doc in DOCS[:1]},
)

def test_sd_worker(self):
config = UrlDedupConfig(document_priority=lambda x: int(x.id))
signature_creation = UrlDedupSignature(output_folder=self.tmp_dir + "/sigs", config=config)
Expand Down

0 comments on commit 94e5bdc

Please sign in to comment.