From adf53f49e72e86d40da23e519a48434c35113a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0koda?= Date: Fri, 3 Nov 2023 11:32:17 +0100 Subject: [PATCH] remove accidentally commited files, fix ci --- .github/workflows/dev.yml | 3 +- lakeapi/_read_parquet.py | 251 -------------------------------------- lakeapi/orderbook.py | 53 -------- lakeapi/test.py | 64 ---------- requirements.txt | 12 +- setup.py | 2 +- tests/test_orderbook.py | 55 --------- 7 files changed, 10 insertions(+), 430 deletions(-) delete mode 100644 lakeapi/orderbook.py delete mode 100644 lakeapi/test.py delete mode 100644 tests/test_orderbook.py diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index f82d5ef..3c6e08b 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -19,8 +19,9 @@ jobs: test: # The type of runner that the job will run on strategy: + fail-fast: false matrix: - python-versions: [3.7, '3.10', '3.11'] + python-versions: [3.7, '3.10', '3.12'] os: [ubuntu-20.04] runs-on: ${{ matrix.os }} timeout-minutes: 10 diff --git a/lakeapi/_read_parquet.py b/lakeapi/_read_parquet.py index 14e8eb1..364ffa4 100644 --- a/lakeapi/_read_parquet.py +++ b/lakeapi/_read_parquet.py @@ -67,83 +67,6 @@ def _pyarrow_parquet_file_wrapper( raise -# def _read_parquet_metadata_file( -# path: str, -# boto3_session: boto3.Session, -# s3_additional_kwargs: Optional[Dict[str, str]], -# use_threads: Union[bool, int], -# version_id: Optional[str] = None, -# ignore_null: bool = False, -# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None, -# ) -> Optional[Dict[str, str]]: -# pyarrow_args = _set_default_pyarrow_additional_kwargs(pyarrow_additional_kwargs) -# with open_s3_object( -# path=path, -# mode="rb", -# version_id=version_id, -# use_threads=use_threads, -# s3_block_size=131_072, # 128 KB (128 * 2**10) -# s3_additional_kwargs=s3_additional_kwargs, -# boto3_session=boto3_session, -# ) as f: -# pq_file: Optional[pyarrow.parquet.ParquetFile] = _pyarrow_parquet_file_wrapper( -# source=f, coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"], path = path, -# ) -# if pq_file is None: -# return None -# return _data_types.athena_types_from_pyarrow_schema( -# schema=pq_file.schema.to_arrow_schema(), partitions=None, ignore_null=ignore_null -# )[0] - - -# def _read_schemas_from_files( -# paths: List[str], -# sampling: float, -# use_threads: Union[bool, int], -# boto3_session: boto3.Session, -# s3_additional_kwargs: Optional[Dict[str, str]], -# version_ids: Optional[Dict[str, str]] = None, -# ignore_null: bool = False, -# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None, -# ) -> Tuple[Dict[str, str], ...]: - -# paths = _utils.list_sampling(lst=paths, sampling=sampling) -# schemas: Tuple[Optional[Dict[str, str]], ...] = tuple() -# n_paths: int = len(paths) -# cpus: int = _utils.ensure_cpu_count(use_threads) -# if cpus == 1 or n_paths == 1: -# schemas = tuple( -# _read_parquet_metadata_file( -# path=p, -# boto3_session=boto3_session, -# s3_additional_kwargs=s3_additional_kwargs, -# use_threads=use_threads, -# version_id=version_ids.get(p) if isinstance(version_ids, dict) else None, -# ignore_null=ignore_null, -# pyarrow_additional_kwargs=pyarrow_additional_kwargs, -# ) -# for p in paths -# ) -# elif n_paths > 1: -# versions = [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths] -# with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor: -# schemas = tuple( -# executor.map( -# _read_parquet_metadata_file, -# paths, -# itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)), # Boto3.Session -# itertools.repeat(s3_additional_kwargs), -# itertools.repeat(use_threads), -# versions, -# itertools.repeat(ignore_null), -# itertools.repeat(pyarrow_additional_kwargs), -# ) -# ) -# schemas = cast(Tuple[Dict[str, str], ...], tuple(x for x in schemas if x is not None)) -# _logger.debug("schemas: %s", schemas) -# return schemas - - def _merge_schemas(schemas: Tuple[Dict[str, str], ...]) -> Dict[str, str]: columns_types: Dict[str, str] = {} for schema in schemas: @@ -156,66 +79,6 @@ def _merge_schemas(schemas: Tuple[Dict[str, str], ...]) -> Dict[str, str]: return columns_types -# def _read_parquet_metadata( -# path: Union[str, List[str]], -# path_suffix: Optional[str], -# path_ignore_suffix: Optional[str], -# ignore_empty: bool, -# ignore_null: bool, -# dtype: Optional[Dict[str, str]], -# sampling: float, -# dataset: bool, -# use_threads: Union[bool, int], -# boto3_session: boto3.Session, -# s3_additional_kwargs: Optional[Dict[str, str]], -# version_id: Optional[Union[str, Dict[str, str]]] = None, -# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None, -# ) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]: -# """Handle wr.s3.read_parquet_metadata internally.""" -# path_root: Optional[str] = _get_path_root(path=path, dataset=dataset) -# paths: List[str] = _path2list( -# path=path, -# boto3_session=boto3_session, -# suffix=path_suffix, -# ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix), -# ignore_empty=ignore_empty, -# s3_additional_kwargs=s3_additional_kwargs, -# ) - -# # Files -# schemas: Tuple[Dict[str, str], ...] = _read_schemas_from_files( -# paths=paths, -# sampling=sampling, -# use_threads=use_threads, -# boto3_session=boto3_session, -# s3_additional_kwargs=s3_additional_kwargs, -# version_ids=version_id -# if isinstance(version_id, dict) -# else {paths[0]: version_id} -# if isinstance(version_id, str) -# else None, -# ignore_null=ignore_null, -# pyarrow_additional_kwargs=pyarrow_additional_kwargs, -# ) -# columns_types: Dict[str, str] = _merge_schemas(schemas=schemas) - -# # Partitions -# partitions_types: Optional[Dict[str, str]] = None -# partitions_values: Optional[Dict[str, List[str]]] = None -# if (dataset is True) and (path_root is not None): -# partitions_types, partitions_values = _extract_partitions_metadata_from_paths(path=path_root, paths=paths) - -# # Casting -# if dtype: -# for k, v in dtype.items(): -# if columns_types and k in columns_types: -# columns_types[k] = v -# if partitions_types and k in partitions_types: -# partitions_types[k] = v - - return columns_types, partitions_types, partitions_values - - def _apply_index(df: pd.DataFrame, metadata: Dict[str, Any]) -> pd.DataFrame: index_columns: List[Any] = metadata["index_columns"] ignore_index: bool = True @@ -751,120 +614,6 @@ def read_parquet( ) -# @apply_configs -# def read_parquet_metadata( -# path: Union[str, List[str]], -# version_id: Optional[Union[str, Dict[str, str]]] = None, -# path_suffix: Optional[str] = None, -# path_ignore_suffix: Optional[str] = None, -# ignore_empty: bool = True, -# ignore_null: bool = False, -# dtype: Optional[Dict[str, str]] = None, -# sampling: float = 1.0, -# dataset: bool = False, -# use_threads: Union[bool, int] = True, -# boto3_session: Optional[boto3.Session] = None, -# s3_additional_kwargs: Optional[Dict[str, Any]] = None, -# pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None, -# ) -> Tuple[Dict[str, str], Optional[Dict[str, str]]]: -# """Read Apache Parquet file(s) metadata from a received S3 prefix or list of S3 objects paths. - -# The concept of Dataset goes beyond the simple idea of files and enable more -# complex features like partitioning and catalog integration (AWS Glue Catalog). - -# This function accepts Unix shell-style wildcards in the path argument. -# * (matches everything), ? (matches any single character), -# [seq] (matches any character in seq), [!seq] (matches any character not in seq). -# If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`), -# you can use `glob.escape(path)` before passing the path to this function. - -# Note -# ---- -# In case of `use_threads=True` the number of threads -# that will be spawned will be gotten from os.cpu_count(). - -# Parameters -# ---------- -# path : Union[str, List[str]] -# S3 prefix (accepts Unix shell-style wildcards) -# (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]). -# version_id: Optional[Union[str, Dict[str, str]]] -# Version id of the object or mapping of object path to version id. -# (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'}) -# path_suffix: Union[str, List[str], None] -# Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]). -# If None, will try to read all files. (default) -# path_ignore_suffix: Union[str, List[str], None] -# Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]). -# If None, will try to read all files. (default) -# ignore_empty: bool -# Ignore files with 0 bytes. -# ignore_null: bool -# Ignore columns with null type. -# dtype : Dict[str, str], optional -# Dictionary of columns names and Athena/Glue types to be casted. -# Useful when you have columns with undetermined data types as partitions columns. -# (e.g. {'col name': 'bigint', 'col2 name': 'int'}) -# sampling : float -# Random sample ratio of files that will have the metadata inspected. -# Must be `0.0 < sampling <= 1.0`. -# The higher, the more accurate. -# The lower, the faster. -# dataset: bool -# If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns. -# use_threads : bool, int -# True to enable concurrent requests, False to disable multiple threads. -# If enabled os.cpu_count() will be used as the max number of threads. -# If integer is provided, specified number is used. -# boto3_session : boto3.Session(), optional -# Boto3 Session. The default boto3 session will be used if boto3_session receive None. -# s3_additional_kwargs : Optional[Dict[str, Any]] -# Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered. -# pyarrow_additional_kwargs: Optional[Dict[str, Any]] -# Forward kwargs to parquet reader currently only excepts "coerce_int96_timestamp_unit". Which can be used to cast -# deprecated Parquet INT96 into a specified timestamp unit (e.g. "ms"). - -# Returns -# ------- -# Tuple[Dict[str, str], Optional[Dict[str, str]]] -# columns_types: Dictionary with keys as column names and values as -# data types (e.g. {'col0': 'bigint', 'col1': 'double'}). / -# partitions_types: Dictionary with keys as partition names -# and values as data types (e.g. {'col2': 'date'}). - -# Examples -# -------- -# Reading all Parquet files (with partitions) metadata under a prefix - -# >>> import awswrangler as wr -# >>> columns_types, partitions_types = wr.s3.read_parquet_metadata(path='s3://bucket/prefix/', dataset=True) - -# Reading all Parquet files metadata from a list - -# >>> import awswrangler as wr -# >>> columns_types, partitions_types = wr.s3.read_parquet_metadata(path=[ -# ... 's3://bucket/filename0.parquet', -# ... 's3://bucket/filename1.parquet' -# ... ]) - -# """ -# return _read_parquet_metadata( -# path=path, -# version_id=version_id, -# path_suffix=path_suffix, -# path_ignore_suffix=path_ignore_suffix, -# ignore_empty=ignore_empty, -# ignore_null=ignore_null, -# dtype=dtype, -# sampling=sampling, -# dataset=dataset, -# use_threads=use_threads, -# s3_additional_kwargs=s3_additional_kwargs, -# boto3_session=_utils.ensure_session(session=boto3_session), -# pyarrow_additional_kwargs=pyarrow_additional_kwargs, -# )[:2] - - def _set_default_pyarrow_additional_kwargs(pyarrow_additional_kwargs: Optional[Dict[str, Any]]) -> Dict[str, Any]: if pyarrow_additional_kwargs is None: pyarrow_additional_kwargs = {} diff --git a/lakeapi/orderbook.py b/lakeapi/orderbook.py deleted file mode 100644 index 585ada2..0000000 --- a/lakeapi/orderbook.py +++ /dev/null @@ -1,53 +0,0 @@ -# import numpy as np -from typing import TYPE_CHECKING, Optional, Tuple -from numba import float64, njit, jit -# from numba.core import types -from numba.typed import Dict - -if TYPE_CHECKING: - import pandas as pd - - -class OrderBookUpdater: - ''' Maintains order book snapshot while iterating over a dataframe with order book deltas. ''' - def __init__(self, df: 'pd.DataFrame'): - self.bid = Dict.empty(key_type = float64, value_type = float64) - self.ask = Dict.empty(key_type = float64, value_type = float64) - self.current_index = 0 - self.received_timestamp = None - self.np_arr = df[['bids', 'asks']].to_numpy() - - @staticmethod - @njit(cache = False) - def _update(bids, asks, bid_book, ask_book): - if len(bids): - for price, size in bids: - if size == 0: - if price in bid_book: - del bid_book[price] - else: - bid_book[price] = size - if len(asks) > 0: - for price, size in asks: - if size == 0: - if price in ask_book: - del ask_book[price] - else: - ask_book[price] = size - - def process_next_row(self, row: Optional[int] = None) -> None: - ''' row in df contains received_time, bid and ask columns with numpy list of price-quantity pairs''' - if self.current_index >= self.np_arr.shape[0]: - # return - raise StopIteration - if row is not None: - self.current_index = row - - self._update(*self.np_arr[self.current_index], self.bid, self.ask) - # self.received_timestamp = self.np_arr[self.current_index][0] - self.current_index += 1 - - def get_bests(self) -> Tuple[float, float]: - # TODO speed up - # return list(self.bid.keys())[-1], next(iter(self.ask.keys())) - return max(self.bid), min(self.ask) diff --git a/lakeapi/test.py b/lakeapi/test.py deleted file mode 100644 index 94a306c..0000000 --- a/lakeapi/test.py +++ /dev/null @@ -1,64 +0,0 @@ -# import boto3 -# from datetime import datetime, timedelta - -# cloudfront_key_pair_id = 'YOUR_CLOUDFRONT_KEY_PAIR_ID' -# cloudfront_private_key_path = 'PATH_TO_YOUR_CLOUDFRONT_PRIVATE_KEY' - -# def generate_signed_url(cloudfront_url): -# cloudfront_signer = boto3.client('cloudfront', region_name='us-east-1') -# expires = datetime.now() + timedelta(minutes=5) # URL will expire in 5 minutes -# cloudfront_url = cloudfront_signer.generate_presigned_url( -# url=cloudfront_url, -# expires_in=300, -# key_pair_id=cloudfront_key_pair_id, -# private_key=open(cloudfront_private_key_path).read(), -# ) -# return cloudfront_url - - - -import boto3 -# import requests - -# # Set up the CloudFront signed URL -# cloudfront_url = 'http://data.crypto-lake.com/book/exchange=BINANCE_FUTURES/symbol=BTC-USDT-PERP/dt=2023-09-13/1.snappy.parquet' -# cloudfront_signer = boto3.client('cloudfront', region_name='eu-west-1') -# signed_url = cloudfront_signer.generate_presigned_url( -# cloudfront_url, -# ) - -# # Set up the S3 request -# # s3_url = 'YOUR_S3_URL' -# # s3_headers = {'Authorization': 'AWS YOUR_ACCESS_KEY_ID:YOUR_SECRET_ACCESS_KEY'} -# s3_response = requests.get(signed_url) - -# # Print the S3 response -# print(s3_response.content) - -s3 = boto3.resource('s3') -cf = boto3.client('cloudfront') - - -video_url = s3.generate_url( - 60, - 'GET', - bucket = 'qnt.data', - key = 'market-data/cryptofeed/book/exchange=BINANCE_FUTURES/symbol=BTC-USDT-PERP/dt=2023-09-13/1.snappy.parquet', - force_http = True -) - -origin = cf.origin.S3Origin( "{}.s3.amazonaws.com".format( settings.CONFIG.AWS_VIDEO_STORAGE_BUCKET_NAME ) ) -cloudf = boto.connect_cloudfront( 'AKIA3URKA4L4U4Q2C4V5', 'cJHv5yRcNQ9QIsDGUWnWuLdncX3uQ9HGs+Ul7j8P' ) -stream_distributions = cloudf.get_all_streaming_distributions() -print(stream_distributions) - -# if not len(stream_distributions): -# distro = cloudf.create_streaming_distribution( origin = origin, enabled = True, comment = 'Video streaming distribution' ) -# else: -# distro = stream_distributions[0] - -# return distro.create_signed_url( -# video_url, -# expire_time = int( time.time() + 3000 ), -# private_key_file = self.premium_video_url -# ) diff --git a/requirements.txt b/requirements.txt index 1ca25f6..9094879 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,23 @@ bump2version==0.5.11 -watchdog==0.9.0 +# watchdog==0.9.0 flake8==3.7.8 tox==3.26.0 coverage==4.5.4 Sphinx==5.2.3 twine==4.0.1 -Click==7.1.2 +# Click==7.1.2 pytest==7.1.3 -jinja2==3.1.2 +# jinja2==3.1.2 typing-extensions==4.4.0; python_version < '3.8' pandas==1.2.0; python_version < '3.8' -pandas==1.5.1; python_version >= '3.8' +pandas==2.1.2; python_version >= '3.8' boto3==1.24.89 cachetools_ext==0.0.8 botocache==0.0.4 # awswrangler==2.19.0 joblib==1.2.0 tqdm==4.64.1 -wheel==0.41.1 +# wheel==0.41.1 +pyarrow==12.0.0; python_version < '3.12' +pyarrow==14.0.0; python_version >= '3.12' diff --git a/setup.py b/setup.py index 4e1dcba..5def001 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ requirements = [ 'pandas>=1.0.5', 'boto3>=1.24,<2', 'cachetools_ext>=0.0.8,<0.1.0', 'botocache>=0.0.4,<0.1.0', - 'joblib>=1.0.0', 'tqdm>=4,<5', + 'joblib>=1.0.0', 'tqdm>=4,<5', 'pyarrow>=2.0.0,<15', 'typing-extensions>=4.0,<5; python_version < \'3.8\'' ] diff --git a/tests/test_orderbook.py b/tests/test_orderbook.py deleted file mode 100644 index e962e6a..0000000 --- a/tests/test_orderbook.py +++ /dev/null @@ -1,55 +0,0 @@ -import pandas as pd -import numpy as np -import pytest - -from lakeapi.orderbook import OrderBookUpdater - -@pytest.fixture -def order_book_updater(example_data): - df = pd.DataFrame(example_data) - df['bids'] = df['bids'].apply(lambda x: np.array(x)) - df['asks'] = df['asks'].apply(lambda x: np.array(x)) - return OrderBookUpdater(df) - -@pytest.fixture -def example_data(): - return [ - { - 'received_time': 1, - 'bids': [(1, 10), (2, 20)], - 'asks': [(3, 10), (4, 20)] - }, - { - 'received_time': 2, - 'bids': [(1, 5)], - 'asks': [(3, 5)] - }, - { - 'received_time': 3, - 'bids': [(2, 0)], - 'asks': [(4, 0)] - } - ] - -def test_process_next_row(order_book_updater, example_data): - order_book_updater.process_next_row() - assert order_book_updater.bid == dict(example_data[0]['bids']) - assert order_book_updater.ask == dict(example_data[0]['asks']) - assert order_book_updater.received_timestamp == example_data[0]['received_time'] - - order_book_updater.process_next_row() - assert order_book_updater.bid[1] == 5 - assert order_book_updater.bid[2] == 20 - - order_book_updater.process_next_row() - assert order_book_updater.ask == {3: 5} - assert order_book_updater.received_timestamp == example_data[-1]['received_time'] - - -def test_get_bests(order_book_updater): - order_book_updater.process_next_row() - assert order_book_updater.get_bests() == (2, 3) - -@pytest.mark.benchmark(group='process_next_row') -def test_process_next_row_benchmark(order_book_updater, benchmark): - benchmark.pedantic(order_book_updater.process_next_row, args = (0,), warmup_rounds=100, iterations=1000, rounds=10)