From 662b9308d25150f8c5938b8f07c5871fd5b9e47d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0koda?= Date: Mon, 6 Nov 2023 13:01:59 +0100 Subject: [PATCH] cloudfront support --- HISTORY.rst | 5 + lakeapi/__init__.py | 2 +- lakeapi/main.py | 249 +++++++++++++++++++++++++++++++++++--------- requirements.txt | 1 + setup.py | 2 +- 5 files changed, 207 insertions(+), 52 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 2a261a4..31f2362 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,11 @@ History ======= +0.10.0 (2023-11-06) +------------------ + +* more efficient optional data transfer implemented via aws cloudfront + 0.9.1 (2023-11-03) ------------------ diff --git a/lakeapi/__init__.py b/lakeapi/__init__.py index a10d49e..d2a977f 100644 --- a/lakeapi/__init__.py +++ b/lakeapi/__init__.py @@ -2,6 +2,6 @@ __author__ = """Jan Skoda""" __email__ = "skoda@jskoda.cz" -__version__ = "__version__ = '0.9.1'" +__version__ = "0.9.1" from .main import load_data, list_data, available_symbols, set_default_bucket, use_sample_data, cache, set_cache_size_limit # noqa diff --git a/lakeapi/main.py b/lakeapi/main.py index 94bea90..b8b0fdd 100644 --- a/lakeapi/main.py +++ b/lakeapi/main.py @@ -1,9 +1,15 @@ -from typing import List, Dict, Optional, Any +from typing import List, Dict, Optional, Any, Tuple try: from typing import Literal except ImportError: from typing_extensions import Literal import datetime +import os +import requests +import functools +import io +import json +import warnings import boto3 import botocore @@ -11,6 +17,9 @@ import pandas as pd from cachetools_ext.fs import FSLRUCache from botocache.botocache import botocache_context +from aws_requests_auth.aws_auth import AWSRequestsAuth +import tqdm.contrib.concurrent +import cachetools import lakeapi._read_parquet import lakeapi._cache @@ -91,6 +100,11 @@ def load_data( bucket = default_bucket if boto3_session is None: boto3_session = boto3.Session(region_name="eu-west-1") + username, method = _login(boto3_session, table) + if method == 'upgrade': + warnings.warn('This lakeapi version is outdated and might misbehave. Please upgrade lakeapi to the latest version eg. using command `pip install -U lakeapi`!') + if method == 'forceupgrade': + raise Exception('This lakeapi version is outdated. Please upgrade lakeapi to the latest version eg. using command `pip install -U lakeapi`!') def partition_filter(partition: Dict[str, str]) -> bool: return ( @@ -110,47 +124,54 @@ def partition_filter(partition: Dict[str, str]) -> bool: if exchanges: assert exchanges[0].upper() == exchanges[0] - with botocache_context( - cache=cache, - action_regex_to_cache=["List.*"], - # This helps in logging all calls made to AWS. Useful while debugging. Default value is False. - call_log=True, - # This supresses warning messages encountered while caching in anonymous mode - supress_warning_message=is_anonymous_access, - ): - last_ex = None - for _ in range(2): - try: - # TODO: log & skip corrupted files - df = lakeapi._read_parquet.read_parquet( - path=f"s3://{bucket}/{table}/", - partition_filter=partition_filter, - categories=["side"] if table == "trades" else None, - dataset=True, # also adds partition columns - boto3_session=boto3_session, - columns=columns, - use_threads=use_threads, - ignore_index=True, - ) - break - except botocore.exceptions.ClientError as ex: - # When 404 file not found error happens, it means the boto cache of available files is wrong and we need - # to clear it and try again. - if int(ex.response['Error']['Code']) == 404: - # An error occurred (404) when calling the HeadObject operation: Not Found - cache.clear() - last_ex = ex - continue - else: - raise - except lakeapi.exceptions.NoFilesFound: - if is_anonymous_access: - raise lakeapi.exceptions.NoFilesFound("No data found for your query in the free sample dataset. Please subscribe to access more data.") - else: - raise - else: - # got error 404 both before and after the cache.clear() - raise last_ex + if method == 'cloudfront': + df = _load_data_cloudfront( + table = table, start = start, end = end, symbols = symbols, exchanges = exchanges, + boto3_session = boto3_session, use_threads = use_threads, username = username + ) + else: + with botocache_context( + cache=cache, + action_regex_to_cache=["List.*"], + # This helps in logging all calls made to AWS. Useful while debugging. Default value is False. + call_log=True, + # This supresses warning messages encountered while caching in anonymous mode + supress_warning_message=is_anonymous_access, + ): + last_ex = None + for _ in range(2): + try: + # TODO: log & skip corrupted files + df = lakeapi._read_parquet.read_parquet( + path=f"s3://{bucket}/{table}/", + partition_filter=partition_filter, + categories=["side"] if table == "trades" else None, + dataset=True, # also adds partition columns + boto3_session=boto3_session, + columns=columns, + use_threads=use_threads, + ignore_index=True, + ) + break + except botocore.exceptions.ClientError as ex: + # When 404 file not found error happens, it means the boto cache of available files is wrong and we need + # to clear it and try again. + if int(ex.response['Error']['Code']) == 404: + # An error occurred (404) when calling the HeadObject operation: Not Found + cache.clear() + last_ex = ex + continue + else: + raise + except lakeapi.exceptions.NoFilesFound: + if is_anonymous_access: + raise lakeapi.exceptions.NoFilesFound("No data found for your query in the free sample dataset. Please subscribe to access more data.") + else: + raise + else: + # got error 404 both before and after the cache.clear() + raise last_ex + if drop_partition_cols: # useful when loading just one symbol and exchange df.drop(columns=["symbol", "exchange", "dt"], inplace=True) @@ -177,6 +198,132 @@ def partition_filter(partition: Dict[str, str]) -> bool: lakeapi._cache._store.reduce_size() return df + +@cachetools.cached(cache=FSLRUCache(maxsize=32, path = '.lake_cache/login', ttl=3600), key=lambda sess, table: f'table={table}') +def _login(boto3_session: boto3.Session, table: str) -> Tuple[str, str]: + ''' return username and method for download ''' + lambda_client = boto3_session.client('lambda') + try: + response = lambda_client.invoke( + FunctionName='lake-backend-dev', + InvocationType='RequestResponse', + Payload=json.dumps({ + 'command': 'app.login', + 'api_key': boto3_session.get_credentials().access_key, + 'table': table, + 'anonymous_access': is_anonymous_access, + 'user_agent': f'lakeapi/{lakeapi.__version__}' + }), + ) + + # Read the response from the Lambda function + response_payload = json.loads(response['Payload'].read().decode('utf-8')) + + return response_payload['username'], response_payload['method'] + except: + return 'unknown', 's3' + + +def _load_data_cloudfront( + table: DataType, + start: Optional[datetime.datetime] = None, + end: Optional[datetime.datetime] = None, + symbols: Optional[List[str]] = None, + exchanges: Optional[List[str]] = None, + *, + boto3_session: Optional[boto3.Session] = None, + use_threads: bool = True, + username: str = 'unknown', + # drop_partition_cols: bool = False, +) -> pd.DataFrame: + if boto3_session is None: + boto3_session = boto3.Session(region_name="eu-west-1") + + # credentials = boto3_session.get_credentials().get_frozen_credentials() + # auth = AWSRequestsAuth( + # aws_access_key=credentials.access_key, + # aws_secret_access_key=credentials.secret_key, + # aws_host='s3.amazonaws.com', + # aws_region='eu-west-1', + # aws_service='lambda' + # ) + # login_response = requests.get(login_api_url, auth = auth, params={'api_key': credentials.access_key}) + # try: + # username = _login(boto3_session) + # except: + # username = 'anonymous' + + # if not exchanges or not symbols or not start or not end: + available_data = list_data(table = table, start = start, end = end, symbols = symbols, exchanges = exchanges, boto3_session = boto3_session) + df = pd.DataFrame(available_data) + if not exchanges: + exchanges = list(df['exchange'].unique()) + if not symbols: + symbols = list(df['symbol'].unique()) + if not start: + start = datetime.datetime.strptime(df['dt'].min(), '%Y-%m-%d') + if not end: + end = datetime.datetime.strptime(df['dt'].max(), '%Y-%m-%d') + # else: + # df = None + + assert exchanges and symbols and start and end + # for start in dateutil.rrule.rrule(freq = dateutil.rrule.DAILY, dtstart = start.date(), until = end.date()): + # for exchange in exchanges: + # for symbol in symbols: + # url = f'https://data.crypto-lake.com/market-data/cryptofeed/{table}/exchange={exchange}/symbol={symbol}/dt={start.date()}/1.snappy.parquet' + + objs = [] + for row in df.itertuples(): + url = f'https://data.crypto-lake.com/market-data/cryptofeed/{table}/exchange={row.exchange}/symbol={row.symbol}/dt={row.dt}/{row.filename}' + objs.append((url, row.symbol, row.exchange, row.dt)) + + if use_threads and os.cpu_count(): + workers = os.cpu_count() + 2 + else: + workers = 1 + + dfs = list(tqdm.contrib.concurrent.thread_map( + functools.partial(_download_cloudfront, boto3_session, username, symbols, exchanges), + objs, + max_workers=workers + )) + return pd.concat([df for df in dfs if not df.empty], ignore_index=True) + + +def _download_cloudfront(session: boto3.Session, username: str, all_symbols: List[str], all_exchanges: List[str], obj) -> pd.DataFrame: + url, symbol, exchange, dt = obj + credentials = session.get_credentials().get_frozen_credentials() + auth = AWSRequestsAuth( + aws_access_key=credentials.access_key, + aws_secret_access_key=credentials.secret_key, + aws_host='qnt.data.s3.amazonaws.com', + aws_region='eu-west-1', + aws_service='s3' + ) + df = _download_cached(auth, url, username) + df['side'] = pd.Series(df['side'], index=df.index, dtype=pd.CategoricalDtype(categories = ['buy', 'sell'])) + df['symbol'] = pd.Series(symbol, index=df.index, dtype=pd.CategoricalDtype(categories = all_symbols)) + df['exchange'] = pd.Series(exchange, index=df.index, dtype=pd.CategoricalDtype(categories = all_exchanges)) + df['dt'] = 0 # this will be deleted anyway + return df + +@lakeapi._cache.cached(ignore = ['auth', 'username']) +def _download_cached(auth, url: str, username: str) -> pd.DataFrame: + # Use stream to be able to process response.raw into parquet faster + # response = requests.get(url, auth = auth, headers = {'Referer': username, 'User-Agent': f'lakeapi/{lakeapi.__version__}'}, stream = True) + # return pd.read_parquet(lakeapi.response_stream.ResponseStream(response.iter_content(chunk_size=1_000_000)), engine='pyarrow') + + response = requests.get(url, auth = auth, headers = {'Referer': username, 'User-Agent': f'lakeapi/{lakeapi.__version__}'}) + + if response.status_code == 404: + return pd.DataFrame() + elif response.status_code != 200: + print('Warning: Unexpected status code', response.status_code, 'for', url) + + return pd.read_parquet(io.BytesIO(response.content), engine='pyarrow') + + def list_data( table: Optional[DataType], start: Optional[datetime.datetime] = None, @@ -272,14 +419,16 @@ def available_symbols( # Test # df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 3), end = None, symbols = ['BTC-USDT'], exchanges = ['BINANCE']) # noqa # df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = None, exchanges = ['BINANCE']) # noqa - use_sample_data(True) - df = load_data( - table="book", - start=None, #datetime.datetime.now() - datetime.timedelta(days=2), - end=None, - symbols=["FTRB-USDT"], - exchanges=None, - ) + df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = None) # noqa + # df = load_data(table = 'book', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = ['KUCOIN']) # noqa + # df = _load_data_cloudfront(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = None) # noqa + # df = load_data( + # table="book", + # start=None, #datetime.datetime.now() - datetime.timedelta(days=2), + # end=None, + # symbols=["FTRB-USDT"], + # exchanges=None, + # ) pd.set_option("display.width", 1000) pd.set_option("display.max_columns", 30) print(df) diff --git a/requirements.txt b/requirements.txt index 9094879..db2d868 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,4 @@ tqdm==4.64.1 # wheel==0.41.1 pyarrow==12.0.0; python_version < '3.12' pyarrow==14.0.0; python_version >= '3.12' +aws-requests-auth==0.4.3 diff --git a/setup.py b/setup.py index cead74f..70f6576 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ requirements = [ 'pandas>=1.0.5', 'boto3>=1.24,<2', 'cachetools_ext>=0.0.8,<0.1.0', 'botocache>=0.0.4,<0.1.0', - 'joblib>=1.0.0', 'tqdm>=4,<5', 'pyarrow>=2.0.0,<15', + 'joblib>=1.0.0', 'tqdm>=4,<5', 'pyarrow>=2.0.0,<15', 'aws-requests-auth==0.4.3', 'typing-extensions>=4.0,<5; python_version < \'3.8\'' ]