Skip to content

Commit

Permalink
improve loading performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Škoda committed Aug 6, 2024
1 parent 009cb97 commit 5580db9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
History
=======

0.16.0 (2024-08-06)
-------------------

* improve data loading performance, especially for the first time

0.15.0 (2024-07-24)
-------------------

Expand Down
65 changes: 44 additions & 21 deletions lakeapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io
import json
import warnings
import zlib

import boto3
import botocore
Expand Down Expand Up @@ -400,31 +401,53 @@ def partition_filter(partition: Dict[str, str]) -> bool:
and (exchanges is None or partition["exchange"] in exchanges)
)

path = f"s3://{bucket}/{table}/"
with botocache_context(
cache=cache,
action_regex_to_cache=["List.*"],
# This helps in logging all calls made to AWS. Useful while debugging. Default value is False.
call_log=True,
# This supresses warning messages encountered while caching. Default value is False.
supress_warning_message=False,
):
paths = lakeapi._read_parquet._path2list(
path=path,
boto3_session=boto3_session,
# suffix=path_suffix,
# ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end,
# ignore_empty=ignore_empty,
s3_additional_kwargs={},
)
path_root = lakeapi._read_parquet._get_path_root(path=path, dataset=True)
if table:
path = f"s3://{bucket}/{table}/"
else:
path = f"s3://{bucket}/"
path_root = lakeapi._read_parquet._get_path_root(path=path, dataset=True)
paths = []

if bucket == default_bucket and not last_modified_begin and not last_modified_end and table:
paths = _get_table_contents_cache_key(boto3_session, bucket, table)

if not paths:
with botocache_context(
cache=cache,
action_regex_to_cache=["List.*"],
# This helps in logging all calls made to AWS. Useful while debugging. Default value is False.
call_log=True,
# This supresses warning messages encountered while caching. Default value is False.
supress_warning_message=False,
):
paths = lakeapi._read_parquet._path2list(
path=path,
boto3_session=boto3_session,
# suffix=path_suffix,
# ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end,
# ignore_empty=ignore_empty,
s3_additional_kwargs={},
)

paths = lakeapi._read_parquet._apply_partition_filter(path_root=path_root, paths=paths, filter_func=partition_filter)
if len(paths) < 1:
raise lakeapi.exceptions.NoFilesFound(f"No files Found on: {path}.")
return [_path_to_dict(path) for path in paths]

@cachetools.cached(cache=FSLRUCache(maxsize=32, path = '.lake_cache/contents', ttl=3600), key=lambda sess, bucket, table: f'bucket={bucket[:7]}-table={table}')
def _get_table_contents_cache_key(boto3_session: boto3.Session, bucket: str, table: str):
try:
s3 = boto3_session.client('s3')
s3_bucket, prefix = bucket.split('/', 1)
obj = s3.get_object(Bucket=s3_bucket, Key=f'{prefix}/{table}/contents.json.gz')
paths = json.loads(zlib.decompress(obj['Body'].read()).decode('utf-8'))['objects']
paths = [f's3://{s3_bucket}/{path}' for path in paths]
return paths
except Exception as ex:
print('Warning: error while fetching from contents cache, using slower method', ex)

def _path_to_dict(path: str) -> Dict[str, Any]:
*_, table, exchange, symbol, dt, filename = path.split('/')
return {
Expand Down Expand Up @@ -461,7 +484,7 @@ def available_symbols(
# df = load_data(table = 'trades_mpid', start = datetime.datetime.now() - datetime.timedelta(days = 3), end = None, symbols = ['stSOL-USDC'], exchanges = ['SERUM']) # noqa
# df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = None, exchanges = ['BINANCE']) # noqa
# df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 4), end = None, symbols = ['TEST-USDT'], exchanges = None) #, boto3_session=session) # noqa
df = load_data(table = 'funding', start = datetime.datetime.now() - datetime.timedelta(days = 465), end = datetime.datetime.now() - datetime.timedelta(days = 464), symbols = ['BTC-USDT-PERP'], exchanges = ['BINANCE_FUTURES']) #, boto3_session=session) # noqa
df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 465), end = datetime.datetime.now() - datetime.timedelta(days = 464), symbols = ['BTC-USDT-PERP'], exchanges = ['BINANCE_FUTURES']) #, boto3_session=session) # noqa
# df = load_data(table = 'book', start = datetime.datetime.now() - datetime.timedelta(days = 1), end = None, symbols = ['BTC-USDT'], exchanges = ['BINANCE']) # noqa
# df = _load_data_cloudfront(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = ['XCAD-USDT'], exchanges = None) # noqa
# df = load_data(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ flake8==3.7.8
tox==3.26.0
coverage==4.5.4
Sphinx==5.2.3
twine==4.0.1
twine==5.1.1
# Click==7.1.2
pytest==7.1.3
pytest-benchmark==4.0.0
Expand Down

0 comments on commit 5580db9

Please sign in to comment.