Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow custom parquet schema #330

Merged
merged 3 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/datatrove/pipeline/writers/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import random
import tempfile
import time
from typing import Callable, Literal
from typing import Any, Callable, Literal

from huggingface_hub import (
CommitOperationAdd,
Expand Down Expand Up @@ -36,6 +36,7 @@ def __init__(
cleanup: bool = True,
expand_metadata: bool = True,
max_file_size: int = round(4.5 * 2**30), # 4.5GB, leave some room for the last batch
schema: Any = None,
):
"""
This class is intended to upload VERY LARGE datasets. Consider using `push_to_hub` or just using a
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
adapter=adapter,
expand_metadata=expand_metadata,
max_file_size=max_file_size,
schema=schema,
)
self.operations = []
self._repo_init = False
Expand Down
8 changes: 5 additions & 3 deletions src/datatrove/pipeline/writers/parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import Counter, defaultdict
from typing import IO, Callable, Literal
from typing import IO, Any, Callable, Literal

from datatrove.io import DataFolderLike
from datatrove.pipeline.writers.disk_base import DiskWriter
Expand All @@ -19,6 +19,7 @@ def __init__(
batch_size: int = 1000,
expand_metadata: bool = False,
max_file_size: int = 5 * 2**30, # 5GB
schema: Any = None,
):
# Validate the compression setting
if compression not in {"snappy", "gzip", "brotli", "lz4", "zstd", None}:
Expand All @@ -40,6 +41,7 @@ def __init__(
self._file_counter = Counter()
self.compression = compression
self.batch_size = batch_size
self.schema = schema

def _on_file_switch(self, original_name, old_filename, new_filename):
"""
Expand All @@ -59,7 +61,7 @@ def _write_batch(self, filename):
import pyarrow as pa

# prepare batch
batch = pa.RecordBatch.from_pylist(self._batches.pop(filename))
batch = pa.RecordBatch.from_pylist(self._batches.pop(filename), schema=self.schema)
# write batch
self._writers[filename].write_batch(batch)

Expand All @@ -70,7 +72,7 @@ def _write(self, document: dict, file_handler: IO, filename: str):
if filename not in self._writers:
self._writers[filename] = pq.ParquetWriter(
file_handler,
schema=pa.RecordBatch.from_pylist([document]).schema,
schema=self.schema if self.schema is not None else pa.RecordBatch.from_pylist([document]).schema,
compression=self.compression,
)
self._batches[filename].append(document)
Expand Down
Loading