Skip to content

Commit

Permalink
cloudfront support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Škoda committed Nov 6, 2023
1 parent 2144c40 commit 662b930
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 52 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
History
=======

0.10.0 (2023-11-06)
------------------

* more efficient optional data transfer implemented via aws cloudfront

0.9.1 (2023-11-03)
------------------

Expand Down
2 changes: 1 addition & 1 deletion lakeapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

__author__ = """Jan Skoda"""
__email__ = "[email protected]"
__version__ = "__version__ = '0.9.1'"
__version__ = "0.9.1"

from .main import load_data, list_data, available_symbols, set_default_bucket, use_sample_data, cache, set_cache_size_limit # noqa
249 changes: 199 additions & 50 deletions lakeapi/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
from typing import List, Dict, Optional, Any
from typing import List, Dict, Optional, Any, Tuple
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import datetime
import os
import requests
import functools
import io
import json
import warnings

import boto3
import botocore
import botocore.exceptions
import pandas as pd
from cachetools_ext.fs import FSLRUCache
from botocache.botocache import botocache_context
from aws_requests_auth.aws_auth import AWSRequestsAuth
import tqdm.contrib.concurrent
import cachetools

import lakeapi._read_parquet
import lakeapi._cache
Expand Down Expand Up @@ -91,6 +100,11 @@ def load_data(
bucket = default_bucket
if boto3_session is None:
boto3_session = boto3.Session(region_name="eu-west-1")
username, method = _login(boto3_session, table)
if method == 'upgrade':
warnings.warn('This lakeapi version is outdated and might misbehave. Please upgrade lakeapi to the latest version eg. using command `pip install -U lakeapi`!')
if method == 'forceupgrade':
raise Exception('This lakeapi version is outdated. Please upgrade lakeapi to the latest version eg. using command `pip install -U lakeapi`!')

def partition_filter(partition: Dict[str, str]) -> bool:
return (
Expand All @@ -110,47 +124,54 @@ def partition_filter(partition: Dict[str, str]) -> bool:
if exchanges:
assert exchanges[0].upper() == exchanges[0]

with botocache_context(
cache=cache,
action_regex_to_cache=["List.*"],
# This helps in logging all calls made to AWS. Useful while debugging. Default value is False.
call_log=True,
# This supresses warning messages encountered while caching in anonymous mode
supress_warning_message=is_anonymous_access,
):
last_ex = None
for _ in range(2):
try:
# TODO: log & skip corrupted files
df = lakeapi._read_parquet.read_parquet(
path=f"s3://{bucket}/{table}/",
partition_filter=partition_filter,
categories=["side"] if table == "trades" else None,
dataset=True, # also adds partition columns
boto3_session=boto3_session,
columns=columns,
use_threads=use_threads,
ignore_index=True,
)
break
except botocore.exceptions.ClientError as ex:
# When 404 file not found error happens, it means the boto cache of available files is wrong and we need
# to clear it and try again.
if int(ex.response['Error']['Code']) == 404:
# An error occurred (404) when calling the HeadObject operation: Not Found
cache.clear()
last_ex = ex
continue
else:
raise
except lakeapi.exceptions.NoFilesFound:
if is_anonymous_access:
raise lakeapi.exceptions.NoFilesFound("No data found for your query in the free sample dataset. Please subscribe to access more data.")
else:
raise
else:
# got error 404 both before and after the cache.clear()
raise last_ex
if method == 'cloudfront':
df = _load_data_cloudfront(
table = table, start = start, end = end, symbols = symbols, exchanges = exchanges,
boto3_session = boto3_session, use_threads = use_threads, username = username
)
else:
with botocache_context(
cache=cache,
action_regex_to_cache=["List.*"],
# This helps in logging all calls made to AWS. Useful while debugging. Default value is False.
call_log=True,
# This supresses warning messages encountered while caching in anonymous mode
supress_warning_message=is_anonymous_access,
):
last_ex = None
for _ in range(2):
try:
# TODO: log & skip corrupted files
df = lakeapi._read_parquet.read_parquet(
path=f"s3://{bucket}/{table}/",
partition_filter=partition_filter,
categories=["side"] if table == "trades" else None,
dataset=True, # also adds partition columns
boto3_session=boto3_session,
columns=columns,
use_threads=use_threads,
ignore_index=True,
)
break
except botocore.exceptions.ClientError as ex:
# When 404 file not found error happens, it means the boto cache of available files is wrong and we need
# to clear it and try again.
if int(ex.response['Error']['Code']) == 404:
# An error occurred (404) when calling the HeadObject operation: Not Found
cache.clear()
last_ex = ex
continue
else:
raise
except lakeapi.exceptions.NoFilesFound:
if is_anonymous_access:
raise lakeapi.exceptions.NoFilesFound("No data found for your query in the free sample dataset. Please subscribe to access more data.")
else:
raise
else:
# got error 404 both before and after the cache.clear()
raise last_ex

if drop_partition_cols:
# useful when loading just one symbol and exchange
df.drop(columns=["symbol", "exchange", "dt"], inplace=True)
Expand All @@ -177,6 +198,132 @@ def partition_filter(partition: Dict[str, str]) -> bool:
lakeapi._cache._store.reduce_size()
return df


@cachetools.cached(cache=FSLRUCache(maxsize=32, path = '.lake_cache/login', ttl=3600), key=lambda sess, table: f'table={table}')
def _login(boto3_session: boto3.Session, table: str) -> Tuple[str, str]:
''' return username and method for download '''
lambda_client = boto3_session.client('lambda')
try:
response = lambda_client.invoke(
FunctionName='lake-backend-dev',
InvocationType='RequestResponse',
Payload=json.dumps({
'command': 'app.login',
'api_key': boto3_session.get_credentials().access_key,
'table': table,
'anonymous_access': is_anonymous_access,
'user_agent': f'lakeapi/{lakeapi.__version__}'
}),
)

# Read the response from the Lambda function
response_payload = json.loads(response['Payload'].read().decode('utf-8'))

return response_payload['username'], response_payload['method']
except:
return 'unknown', 's3'


def _load_data_cloudfront(
table: DataType,
start: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
symbols: Optional[List[str]] = None,
exchanges: Optional[List[str]] = None,
*,
boto3_session: Optional[boto3.Session] = None,
use_threads: bool = True,
username: str = 'unknown',
# drop_partition_cols: bool = False,
) -> pd.DataFrame:
if boto3_session is None:
boto3_session = boto3.Session(region_name="eu-west-1")

# credentials = boto3_session.get_credentials().get_frozen_credentials()
# auth = AWSRequestsAuth(
# aws_access_key=credentials.access_key,
# aws_secret_access_key=credentials.secret_key,
# aws_host='s3.amazonaws.com',
# aws_region='eu-west-1',
# aws_service='lambda'
# )
# login_response = requests.get(login_api_url, auth = auth, params={'api_key': credentials.access_key})
# try:
# username = _login(boto3_session)
# except:
# username = 'anonymous'

# if not exchanges or not symbols or not start or not end:
available_data = list_data(table = table, start = start, end = end, symbols = symbols, exchanges = exchanges, boto3_session = boto3_session)
df = pd.DataFrame(available_data)
if not exchanges:
exchanges = list(df['exchange'].unique())
if not symbols:
symbols = list(df['symbol'].unique())
if not start:
start = datetime.datetime.strptime(df['dt'].min(), '%Y-%m-%d')
if not end:
end = datetime.datetime.strptime(df['dt'].max(), '%Y-%m-%d')
# else:
# df = None

assert exchanges and symbols and start and end
# for start in dateutil.rrule.rrule(freq = dateutil.rrule.DAILY, dtstart = start.date(), until = end.date()):
# for exchange in exchanges:
# for symbol in symbols:
# url = f'https://data.crypto-lake.com/market-data/cryptofeed/{table}/exchange={exchange}/symbol={symbol}/dt={start.date()}/1.snappy.parquet'

objs = []
for row in df.itertuples():
url = f'https://data.crypto-lake.com/market-data/cryptofeed/{table}/exchange={row.exchange}/symbol={row.symbol}/dt={row.dt}/{row.filename}'
objs.append((url, row.symbol, row.exchange, row.dt))

if use_threads and os.cpu_count():
workers = os.cpu_count() + 2
else:
workers = 1

dfs = list(tqdm.contrib.concurrent.thread_map(
functools.partial(_download_cloudfront, boto3_session, username, symbols, exchanges),
objs,
max_workers=workers
))
return pd.concat([df for df in dfs if not df.empty], ignore_index=True)


def _download_cloudfront(session: boto3.Session, username: str, all_symbols: List[str], all_exchanges: List[str], obj) -> pd.DataFrame:
url, symbol, exchange, dt = obj
credentials = session.get_credentials().get_frozen_credentials()
auth = AWSRequestsAuth(
aws_access_key=credentials.access_key,
aws_secret_access_key=credentials.secret_key,
aws_host='qnt.data.s3.amazonaws.com',
aws_region='eu-west-1',
aws_service='s3'
)
df = _download_cached(auth, url, username)
df['side'] = pd.Series(df['side'], index=df.index, dtype=pd.CategoricalDtype(categories = ['buy', 'sell']))
df['symbol'] = pd.Series(symbol, index=df.index, dtype=pd.CategoricalDtype(categories = all_symbols))
df['exchange'] = pd.Series(exchange, index=df.index, dtype=pd.CategoricalDtype(categories = all_exchanges))
df['dt'] = 0 # this will be deleted anyway
return df

@lakeapi._cache.cached(ignore = ['auth', 'username'])
def _download_cached(auth, url: str, username: str) -> pd.DataFrame:
# Use stream to be able to process response.raw into parquet faster
# response = requests.get(url, auth = auth, headers = {'Referer': username, 'User-Agent': f'lakeapi/{lakeapi.__version__}'}, stream = True)
# return pd.read_parquet(lakeapi.response_stream.ResponseStream(response.iter_content(chunk_size=1_000_000)), engine='pyarrow')

response = requests.get(url, auth = auth, headers = {'Referer': username, 'User-Agent': f'lakeapi/{lakeapi.__version__}'})

if response.status_code == 404:
return pd.DataFrame()
elif response.status_code != 200:
print('Warning: Unexpected status code', response.status_code, 'for', url)

return pd.read_parquet(io.BytesIO(response.content), engine='pyarrow')


def list_data(
table: Optional[DataType],
start: Optional[datetime.datetime] = None,
Expand Down Expand Up @@ -272,14 +419,16 @@ def available_symbols(
# Test
# df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 3), end = None, symbols = ['BTC-USDT'], exchanges = ['BINANCE']) # noqa
# df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = None, exchanges = ['BINANCE']) # noqa
use_sample_data(True)
df = load_data(
table="book",
start=None, #datetime.datetime.now() - datetime.timedelta(days=2),
end=None,
symbols=["FTRB-USDT"],
exchanges=None,
)
df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = None) # noqa
# df = load_data(table = 'book', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = ['KUCOIN']) # noqa
# df = _load_data_cloudfront(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = None) # noqa
# df = load_data(
# table="book",
# start=None, #datetime.datetime.now() - datetime.timedelta(days=2),
# end=None,
# symbols=["FTRB-USDT"],
# exchanges=None,
# )
pd.set_option("display.width", 1000)
pd.set_option("display.max_columns", 30)
print(df)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ tqdm==4.64.1
# wheel==0.41.1
pyarrow==12.0.0; python_version < '3.12'
pyarrow==14.0.0; python_version >= '3.12'
aws-requests-auth==0.4.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

requirements = [
'pandas>=1.0.5', 'boto3>=1.24,<2', 'cachetools_ext>=0.0.8,<0.1.0', 'botocache>=0.0.4,<0.1.0',
'joblib>=1.0.0', 'tqdm>=4,<5', 'pyarrow>=2.0.0,<15',
'joblib>=1.0.0', 'tqdm>=4,<5', 'pyarrow>=2.0.0,<15', 'aws-requests-auth==0.4.3',
'typing-extensions>=4.0,<5; python_version < \'3.8\''
]

Expand Down

0 comments on commit 662b930

Please sign in to comment.