Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cuda catgorical concat #195

Merged
merged 7 commits into from
Feb 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions _scripts/install-cudf-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ mv go goroot
echo 'export GOROOT=${HOME}/goroot' >> ~/.bashrc
echo 'export PATH=${GOROOT}/bin:${PATH}' >> ~/.bashrc

CONDA_INSTALL="${HOME}/miniconda3/bin/conda install -y"

# Install cudf
~/miniconda3/bin/conda install \
${CONDA_INSTALL} \
-c nvidia -c rapidsai -c pytorch -c numba \
-c conda-forge -c defaults \
cudf=0.5 cuml=0.5 python=3.6
~/miniconda3/bin/conda install cudatoolkit=9.2
${CONDA_INSTALL} cudatoolkit=9.2

# Install testing
~/miniconda3/bin/conda install pytest pyyaml
${CONDA_INSTALL} pytest pyyaml

# Get frames code
git clone https://github.com/v3io/frames.git

# Install frames dependencies
conda install grpcio-tools=1.16.1 protobuf=3.6.1 requests=2.21.0
12 changes: 12 additions & 0 deletions clients/py/environment-cudf.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
channels:
- conda-forge
- defaults
- numba
- nvidia
- pytorch
- rapidsai
dependencies:
- cudf=0.5
- cuml=0.5
- python=3.6
- cudatoolkit=9.2
7 changes: 7 additions & 0 deletions clients/py/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
channels:
- defaults
dependencies:
- grpcio-tools=1.16.1
- protobuf=3.6.1
- requests=2.21.0
- pandas>=0.23.*
15 changes: 15 additions & 0 deletions clients/py/tests/test_cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from time import sleep, time

import pandas as pd
import pytest

import v3io_frames as v3f
Expand Down Expand Up @@ -45,3 +46,17 @@ def test_cudf(framesd, session):
assert isinstance(rdf, cudf.DataFrame), 'not a cudf.DataFrame'
assert len(rdf) == len(df), 'wrong frame size'
assert set(rdf.columns) == set(df.columns), 'columns mismatch'


@pytest.mark.skipif(not has_cudf, reason='cudf not found')
def test_concat_categorical():
df1 = cudf.DataFrame({'a': range(10, 13), 'b': range(50, 53)})
df1['c'] = pd.Series(['a']*3, dtype='category')

df2 = cudf.DataFrame({'a': range(20, 23), 'b': range(60, 63)})
df2['c'] = pd.Series(['b']*3, dtype='category')

df = v3f.pdutils.concat_dfs([df1, df2], cudf.DataFrame, cudf.concat)
assert len(df) == len(df1) + len(df2), 'bad concat size'
dtype = df['c'].dtype
assert v3f.pdutils.is_categorical_dtype(dtype), 'result not categorical'
7 changes: 5 additions & 2 deletions clients/py/v3io_frames/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@


def Client(address='', data_url='', container='', path='', user='',
password='', token='', session_id='', frame_factory=pd.DataFrame):
password='', token='', session_id='', frame_factory=pd.DataFrame,
concat=pd.concat):
"""Return a new client.

Parameters
Expand All @@ -60,6 +61,8 @@ def Client(address='', data_url='', container='', path='', user='',
Session ID (session info)
frame_factory : class
DataFrame factory
concat : function
Function to concat DataFrames
"""
protocol = urlparse(address).scheme or 'grpc'
if protocol not in _known_protocols:
Expand All @@ -78,7 +81,7 @@ def Client(address='', data_url='', container='', path='', user='',
)

cls = gRPCClient if protocol == 'grpc' else HTTPClient
return cls(address, session, frame_factory=frame_factory)
return cls(address, session, frame_factory=frame_factory, concat=concat)


def session_from_env():
Expand Down
6 changes: 5 additions & 1 deletion clients/py/v3io_frames/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@


class ClientBase:
def __init__(self, address, session, frame_factory=pd.DataFrame):
def __init__(self, address, session, frame_factory=pd.DataFrame,
concat=pd.concat):
"""Create new client

Parameters
Expand All @@ -35,13 +36,16 @@ def __init__(self, address, session, frame_factory=pd.DataFrame):
Session object
frame_factory : class
DataFrame factory (currencly pandas and cudf supported)
concat : function
Function to concat DataFrames
"""
address = address or environ.get('V3IO_FRAMESD')
if not address:
raise ValueError('empty address')
self.address = self._fix_address(address)
self.session = session
self.frame_factory = frame_factory
self.concat = concat

def read(self, backend='', table='', query='', columns=None, filter='',
group_by='', limit=0, data_format='', row_layout=False,
Expand Down
2 changes: 1 addition & 1 deletion clients/py/v3io_frames/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _read(self, backend, table, query, columns, filter, group_by, limit,
backend, table, query, columns, filter, group_by, limit,
data_format, row_layout, max_in_message, marker, **kw)
if not iterator:
return concat_dfs(dfs)
return concat_dfs(dfs, self.frame_factory, self.concat)
return dfs

@grpc_raise(WriteError)
Expand Down
2 changes: 1 addition & 1 deletion clients/py/v3io_frames/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _read(self, backend, table, query, columns, filter, group_by, limit,
dfs = self._iter_dfs(resp.raw)

if not iterator:
return concat_dfs(dfs)
return concat_dfs(dfs, self.frame_factory, self.concat)
return dfs

@connection_error(WriteError)
Expand Down
22 changes: 11 additions & 11 deletions clients/py/v3io_frames/pdutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@
from .pbutils import is_categorical_dtype


def concat_dfs(dfs, frame_factory=pd.DataFrame):
def concat_dfs(dfs, frame_factory=pd.DataFrame, concat=pd.concat):
"""Concat sequence of DataFrames, can handle MultiIndex frames."""
dfs = list(dfs)
if not dfs:
return frame_factory()

if not isinstance(dfs[0], pd.DataFrame):
import cudf
return cudf.concat(dfs)

# Make sure concat keep categorical columns
# See https://stackoverflow.com/a/44086708/7650
align_categories(dfs)

names = list(dfs[0].index.names)
wdf = pd.concat(
if hasattr(dfs[0].index, 'names'):
names = list(dfs[0].index.names)
else:
names = [dfs[0].index.name]
had_index = 'index' in dfs[0].columns

wdf = concat(
[df.reset_index() for df in dfs],
ignore_index=True,
sort=False,
)

if len(names) > 1:
Expand All @@ -48,7 +48,8 @@ def concat_dfs(dfs, frame_factory=pd.DataFrame):
elif names[0]:
wdf = wdf.set_index(names[0])
elif names[0] is None:
del wdf['index'] # Pandas will add 'index' column
if not had_index and 'index' in wdf.columns:
del wdf['index'] # Pandas will add 'index' column

with warnings.catch_warnings():
warnings.simplefilter('ignore')
Expand All @@ -66,5 +67,4 @@ def align_categories(dfs):
for df in dfs:
for col in df.columns:
if is_categorical_dtype(df[col].dtype):
cats = all_cats - set(df[col].cat.categories)
df[col].cat.add_categories(cats, inplace=True)
df[col] = df[col].cat.set_categories(all_cats)