Skip to content

Commit

Permalink
remove accidentally commited files, fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Škoda committed Nov 3, 2023
1 parent 50b3acb commit 6b71f2a
Show file tree
Hide file tree
Showing 7 changed files with 8 additions and 430 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
# The type of runner that the job will run on
strategy:
matrix:
python-versions: [3.7, '3.10', '3.11']
python-versions: [3.7, '3.10', '3.12']
os: [ubuntu-20.04]
runs-on: ${{ matrix.os }}
timeout-minutes: 10
Expand Down
251 changes: 0 additions & 251 deletions lakeapi/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,83 +67,6 @@ def _pyarrow_parquet_file_wrapper(
raise


# def _read_parquet_metadata_file(
# path: str,
# boto3_session: boto3.Session,
# s3_additional_kwargs: Optional[Dict[str, str]],
# use_threads: Union[bool, int],
# version_id: Optional[str] = None,
# ignore_null: bool = False,
# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
# ) -> Optional[Dict[str, str]]:
# pyarrow_args = _set_default_pyarrow_additional_kwargs(pyarrow_additional_kwargs)
# with open_s3_object(
# path=path,
# mode="rb",
# version_id=version_id,
# use_threads=use_threads,
# s3_block_size=131_072, # 128 KB (128 * 2**10)
# s3_additional_kwargs=s3_additional_kwargs,
# boto3_session=boto3_session,
# ) as f:
# pq_file: Optional[pyarrow.parquet.ParquetFile] = _pyarrow_parquet_file_wrapper(
# source=f, coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"], path = path,
# )
# if pq_file is None:
# return None
# return _data_types.athena_types_from_pyarrow_schema(
# schema=pq_file.schema.to_arrow_schema(), partitions=None, ignore_null=ignore_null
# )[0]


# def _read_schemas_from_files(
# paths: List[str],
# sampling: float,
# use_threads: Union[bool, int],
# boto3_session: boto3.Session,
# s3_additional_kwargs: Optional[Dict[str, str]],
# version_ids: Optional[Dict[str, str]] = None,
# ignore_null: bool = False,
# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
# ) -> Tuple[Dict[str, str], ...]:

# paths = _utils.list_sampling(lst=paths, sampling=sampling)
# schemas: Tuple[Optional[Dict[str, str]], ...] = tuple()
# n_paths: int = len(paths)
# cpus: int = _utils.ensure_cpu_count(use_threads)
# if cpus == 1 or n_paths == 1:
# schemas = tuple(
# _read_parquet_metadata_file(
# path=p,
# boto3_session=boto3_session,
# s3_additional_kwargs=s3_additional_kwargs,
# use_threads=use_threads,
# version_id=version_ids.get(p) if isinstance(version_ids, dict) else None,
# ignore_null=ignore_null,
# pyarrow_additional_kwargs=pyarrow_additional_kwargs,
# )
# for p in paths
# )
# elif n_paths > 1:
# versions = [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths]
# with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
# schemas = tuple(
# executor.map(
# _read_parquet_metadata_file,
# paths,
# itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)), # Boto3.Session
# itertools.repeat(s3_additional_kwargs),
# itertools.repeat(use_threads),
# versions,
# itertools.repeat(ignore_null),
# itertools.repeat(pyarrow_additional_kwargs),
# )
# )
# schemas = cast(Tuple[Dict[str, str], ...], tuple(x for x in schemas if x is not None))
# _logger.debug("schemas: %s", schemas)
# return schemas


def _merge_schemas(schemas: Tuple[Dict[str, str], ...]) -> Dict[str, str]:
columns_types: Dict[str, str] = {}
for schema in schemas:
Expand All @@ -156,66 +79,6 @@ def _merge_schemas(schemas: Tuple[Dict[str, str], ...]) -> Dict[str, str]:
return columns_types


# def _read_parquet_metadata(
# path: Union[str, List[str]],
# path_suffix: Optional[str],
# path_ignore_suffix: Optional[str],
# ignore_empty: bool,
# ignore_null: bool,
# dtype: Optional[Dict[str, str]],
# sampling: float,
# dataset: bool,
# use_threads: Union[bool, int],
# boto3_session: boto3.Session,
# s3_additional_kwargs: Optional[Dict[str, str]],
# version_id: Optional[Union[str, Dict[str, str]]] = None,
# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
# ) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]:
# """Handle wr.s3.read_parquet_metadata internally."""
# path_root: Optional[str] = _get_path_root(path=path, dataset=dataset)
# paths: List[str] = _path2list(
# path=path,
# boto3_session=boto3_session,
# suffix=path_suffix,
# ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
# ignore_empty=ignore_empty,
# s3_additional_kwargs=s3_additional_kwargs,
# )

# # Files
# schemas: Tuple[Dict[str, str], ...] = _read_schemas_from_files(
# paths=paths,
# sampling=sampling,
# use_threads=use_threads,
# boto3_session=boto3_session,
# s3_additional_kwargs=s3_additional_kwargs,
# version_ids=version_id
# if isinstance(version_id, dict)
# else {paths[0]: version_id}
# if isinstance(version_id, str)
# else None,
# ignore_null=ignore_null,
# pyarrow_additional_kwargs=pyarrow_additional_kwargs,
# )
# columns_types: Dict[str, str] = _merge_schemas(schemas=schemas)

# # Partitions
# partitions_types: Optional[Dict[str, str]] = None
# partitions_values: Optional[Dict[str, List[str]]] = None
# if (dataset is True) and (path_root is not None):
# partitions_types, partitions_values = _extract_partitions_metadata_from_paths(path=path_root, paths=paths)

# # Casting
# if dtype:
# for k, v in dtype.items():
# if columns_types and k in columns_types:
# columns_types[k] = v
# if partitions_types and k in partitions_types:
# partitions_types[k] = v

return columns_types, partitions_types, partitions_values


def _apply_index(df: pd.DataFrame, metadata: Dict[str, Any]) -> pd.DataFrame:
index_columns: List[Any] = metadata["index_columns"]
ignore_index: bool = True
Expand Down Expand Up @@ -751,120 +614,6 @@ def read_parquet(
)


# @apply_configs
# def read_parquet_metadata(
# path: Union[str, List[str]],
# version_id: Optional[Union[str, Dict[str, str]]] = None,
# path_suffix: Optional[str] = None,
# path_ignore_suffix: Optional[str] = None,
# ignore_empty: bool = True,
# ignore_null: bool = False,
# dtype: Optional[Dict[str, str]] = None,
# sampling: float = 1.0,
# dataset: bool = False,
# use_threads: Union[bool, int] = True,
# boto3_session: Optional[boto3.Session] = None,
# s3_additional_kwargs: Optional[Dict[str, Any]] = None,
# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
# ) -> Tuple[Dict[str, str], Optional[Dict[str, str]]]:
# """Read Apache Parquet file(s) metadata from a received S3 prefix or list of S3 objects paths.

# The concept of Dataset goes beyond the simple idea of files and enable more
# complex features like partitioning and catalog integration (AWS Glue Catalog).

# This function accepts Unix shell-style wildcards in the path argument.
# * (matches everything), ? (matches any single character),
# [seq] (matches any character in seq), [!seq] (matches any character not in seq).
# If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`),
# you can use `glob.escape(path)` before passing the path to this function.

# Note
# ----
# In case of `use_threads=True` the number of threads
# that will be spawned will be gotten from os.cpu_count().

# Parameters
# ----------
# path : Union[str, List[str]]
# S3 prefix (accepts Unix shell-style wildcards)
# (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
# version_id: Optional[Union[str, Dict[str, str]]]
# Version id of the object or mapping of object path to version id.
# (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
# path_suffix: Union[str, List[str], None]
# Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
# If None, will try to read all files. (default)
# path_ignore_suffix: Union[str, List[str], None]
# Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]).
# If None, will try to read all files. (default)
# ignore_empty: bool
# Ignore files with 0 bytes.
# ignore_null: bool
# Ignore columns with null type.
# dtype : Dict[str, str], optional
# Dictionary of columns names and Athena/Glue types to be casted.
# Useful when you have columns with undetermined data types as partitions columns.
# (e.g. {'col name': 'bigint', 'col2 name': 'int'})
# sampling : float
# Random sample ratio of files that will have the metadata inspected.
# Must be `0.0 < sampling <= 1.0`.
# The higher, the more accurate.
# The lower, the faster.
# dataset: bool
# If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns.
# use_threads : bool, int
# True to enable concurrent requests, False to disable multiple threads.
# If enabled os.cpu_count() will be used as the max number of threads.
# If integer is provided, specified number is used.
# boto3_session : boto3.Session(), optional
# Boto3 Session. The default boto3 session will be used if boto3_session receive None.
# s3_additional_kwargs : Optional[Dict[str, Any]]
# Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
# pyarrow_additional_kwargs: Optional[Dict[str, Any]]
# Forward kwargs to parquet reader currently only excepts "coerce_int96_timestamp_unit". Which can be used to cast
# deprecated Parquet INT96 into a specified timestamp unit (e.g. "ms").

# Returns
# -------
# Tuple[Dict[str, str], Optional[Dict[str, str]]]
# columns_types: Dictionary with keys as column names and values as
# data types (e.g. {'col0': 'bigint', 'col1': 'double'}). /
# partitions_types: Dictionary with keys as partition names
# and values as data types (e.g. {'col2': 'date'}).

# Examples
# --------
# Reading all Parquet files (with partitions) metadata under a prefix

# >>> import awswrangler as wr
# >>> columns_types, partitions_types = wr.s3.read_parquet_metadata(path='s3://bucket/prefix/', dataset=True)

# Reading all Parquet files metadata from a list

# >>> import awswrangler as wr
# >>> columns_types, partitions_types = wr.s3.read_parquet_metadata(path=[
# ... 's3://bucket/filename0.parquet',
# ... 's3://bucket/filename1.parquet'
# ... ])

# """
# return _read_parquet_metadata(
# path=path,
# version_id=version_id,
# path_suffix=path_suffix,
# path_ignore_suffix=path_ignore_suffix,
# ignore_empty=ignore_empty,
# ignore_null=ignore_null,
# dtype=dtype,
# sampling=sampling,
# dataset=dataset,
# use_threads=use_threads,
# s3_additional_kwargs=s3_additional_kwargs,
# boto3_session=_utils.ensure_session(session=boto3_session),
# pyarrow_additional_kwargs=pyarrow_additional_kwargs,
# )[:2]


def _set_default_pyarrow_additional_kwargs(pyarrow_additional_kwargs: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if pyarrow_additional_kwargs is None:
pyarrow_additional_kwargs = {}
Expand Down
53 changes: 0 additions & 53 deletions lakeapi/orderbook.py

This file was deleted.

64 changes: 0 additions & 64 deletions lakeapi/test.py

This file was deleted.

Loading

0 comments on commit 6b71f2a

Please sign in to comment.