Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the example code of how to accelerate the data sharing with vineyard on fluid platform. #1776

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import fluid

from fluid import constants
from fluid import models

# Use the default kubeconfig file to connect to the Fluid control plane

Check notice on line 6 in k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py#L6

Trailing whitespace
# and create a Fluid client instance
client_config = fluid.ClientConfig()
fluid_client = fluid.FluidClient(client_config)

# Create a dataset named "vineyard" in the default namespace
fluid_client.create_dataset(
dataset_name="vineyard",
mount_name="dummy-mount-name",
mount_point="dummy-mount-point"
)

# Get the dataset instance of the "vineyard" dataset
dataset = fluid_client.get_dataset(dataset_name="vineyard")

# Init vineyard runtime configuration and bind the vineyard dataset instance to the runtime.
# Replicas is 2, and the memory is 30Gi
dataset.bind_runtime(
runtime_type=constants.VINEYARD_RUNTIME_KIND,
replicas=2,
cache_capacity_GiB=30,
cache_medium="MEM",
wait=True
)

# define the script of data preprocessing
preprocess_data_script = """
pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2
#!/bin/bash
set -ex

cat <<EOF > ./preprocess.py
from sklearn.model_selection import train_test_split

import pandas as pd
import vineyard

df = pd.read_pickle('/data/df.pkl')

# Preprocess Data
df = df.drop(df[(df['GrLivArea']>4800)].index)
X = df.drop('SalePrice', axis=1) # Features
y = df['SalePrice'] # Target variable

del df

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

del X, y

vineyard.put(X_train, name="x_train", persist=True)
vineyard.put(X_test, name="x_test", persist=True)
vineyard.put(y_train, name="y_train", persist=True)
vineyard.put(y_test, name="y_test", persist=True)

EOF

python3 ./preprocess.py
"""

# define the script of model training
train_data_script = """
pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2
#!/bin/bash
set -ex

cat <<EOF > ./train.py
from sklearn.linear_model import LinearRegression

import joblib
import pandas as pd
import vineyard

x_train_data = vineyard.get(name="x_train", fetch=True)
y_train_data = vineyard.get(name="y_train", fetch=True)

model = LinearRegression()
model.fit(x_train_data, y_train_data)

joblib.dump(model, '/data/model.pkl')

EOF
python3 ./train.py
"""

# define the script of model testing
test_data_script = """
pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2
#!/bin/bash
set -ex

cat <<EOF > ./test.py
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

import vineyard
import joblib
import pandas as pd

x_test_data = vineyard.get(name="x_test", fetch=True)
y_test_data = vineyard.get(name="y_test", fetch=True)

model = joblib.load("/data/model.pkl")
y_pred = model.predict(x_test_data)

err = mean_squared_error(y_test_data, y_pred)

with open('/data/output.txt', 'a') as f:
f.write(str(err))

EOF

python3 ./test.py
"""

from kubernetes.client import models as k8s_models

Check notice on line 121 in k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py#L121

Import "from kubernetes.client import models as k8s_models" should be placed at the top of the module
# define the template of the task processor and mount the OSS Volume
def create_processor(script):
return models.Processor(
# When enabling fuse affinity scheduling, add the following label
# to achieve the best performance of data processing
# pod_metadata=models.PodMetadata(
# labels={"fuse.serverful.fluid.io/inject": "true"},
# ),
script=models.ScriptProcessor(
command=["bash"],
source=script,
image="python",
image_tag="3.10",
volumes=[k8s_models.V1Volume(
name="data",
persistent_volume_claim=k8s_models.V1PersistentVolumeClaimVolumeSource(
claim_name="pvc-oss"
)
)],
volume_mounts=[k8s_models.V1VolumeMount(
name="data",
mount_path="/data"
)],
)

Check notice on line 145 in k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py#L145

Trailing whitespace
)

preprocess_processor = create_processor(preprocess_data_script)
train_processor = create_processor(train_data_script)
test_processor = create_processor(test_data_script)

# Create a linear regression model task workflow: data preprocessing -> model training -> model testing
# The following mount path "/var/run" is the default path of the vineyard configuration file
flow = dataset.process(processor=preprocess_processor, dataset_mountpath="/var/run") \
.process(processor=train_processor, dataset_mountpath="/var/run") \
.process(processor=test_processor, dataset_mountpath="/var/run")

# Submit the linear regression model task workflow to the Fluid platform and start execution
run = flow.run(run_id="linear-regression-with-vineyard")
run.wait()

# Clean up all resources
dataset.clean_up(wait=True)
65 changes: 65 additions & 0 deletions k8s/examples/vineyard-on-fluid/prepare-dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import numpy as np
import pandas as pd

# generate a dataframe with size around 22G
num_rows = 6000 * 10000
df = pd.DataFrame({
'Id': np.random.randint(1, 100000, num_rows),
'MSSubClass': np.random.randint(20, 201, size=num_rows),
'LotFrontage': np.random.randint(50, 151, size=num_rows),
'LotArea': np.random.randint(5000, 20001, size=num_rows),
'OverallQual': np.random.randint(1, 11, size=num_rows),
'OverallCond': np.random.randint(1, 11, size=num_rows),
'YearBuilt': np.random.randint(1900, 2022, size=num_rows),
'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows),
'MasVnrArea': np.random.randint(0, 1001, size=num_rows),
'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows),
'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows),
'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows),
'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows),
'1stFlrSF': np.random.randint(500, 4001, size=num_rows),
'2andFlrSF': np.random.randint(0, 2001, size=num_rows),
'LowQualFinSF': np.random.randint(0, 201, size=num_rows),
'GrLivArea': np.random.randint(600, 5001, size=num_rows),
'BsmtFullBath': np.random.randint(0, 4, size=num_rows),
'BsmtHalfBath': np.random.randint(0, 3, size=num_rows),
'FullBath': np.random.randint(0, 5, size=num_rows),
'HalfBath': np.random.randint(0, 3, size=num_rows),
'BedroomAbvGr': np.random.randint(0, 11, size=num_rows),
'KitchenAbvGr': np.random.randint(0, 4, size=num_rows),
'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows),
'Fireplaces': np.random.randint(0, 4, size=num_rows),
'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows),
'GarageCars': np.random.randint(0, 5, num_rows),
'GarageArea': np.random.randint(0, 1001, num_rows),
'WoodDeckSF': np.random.randint(0, 501, num_rows),
'OpenPorchSF': np.random.randint(0, 301, num_rows),
'EnclosedPorch': np.random.randint(0, 201, num_rows),
'3SsnPorch': np.random.randint(0, 101, num_rows),
'ScreenPorch': np.random.randint(0, 201, num_rows),
'PoolArea': np.random.randint(0, 301, num_rows),
'MiscVal': np.random.randint(0, 5001, num_rows),
'TotalRooms': np.random.randint(2, 11, num_rows),
"GarageAge": np.random.randint(1, 31, num_rows),
"RemodAge": np.random.randint(1, 31, num_rows),
"HouseAge": np.random.randint(1, 31, num_rows),
"TotalBath": np.random.randint(1, 5, num_rows),
"TotalPorchSF": np.random.randint(1, 1001, num_rows),
"TotalSF": np.random.randint(1000, 6001, num_rows),
"TotalArea": np.random.randint(1000, 6001, num_rows),
'MoSold': np.random.randint(1, 13, num_rows),
'YrSold': np.random.randint(2006, 2022, num_rows),
'SalePrice': np.random.randint(50000, 800001, num_rows),
})

import oss2

Check notice on line 55 in k8s/examples/vineyard-on-fluid/prepare-dataset.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/prepare-dataset.py#L55

Import "import oss2" should be placed at the top of the module
import io

Check notice on line 56 in k8s/examples/vineyard-on-fluid/prepare-dataset.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/prepare-dataset.py#L56

Import "import io" should be placed at the top of the module

Check notice on line 56 in k8s/examples/vineyard-on-fluid/prepare-dataset.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/prepare-dataset.py#L56

standard import "import io" should be placed before "import numpy as np"
from oss2.credentials import EnvironmentVariableCredentialsProvider

Check notice on line 57 in k8s/examples/vineyard-on-fluid/prepare-dataset.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

k8s/examples/vineyard-on-fluid/prepare-dataset.py#L57

Import "from oss2.credentials import EnvironmentVariableCredentialsProvider" should be placed at the top of the module
# Please set your OSS accessKeyID and accessKeySecret as environment variables OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# Please replace OSS_ENDPOINT and BUCKET_NAME with your OSS Endpoint and Bucket
bucket = oss2.Bucket(auth, 'OSS_ENDPOINT', 'BUCKET_NAME')

bytes_buffer = io.BytesIO()
df.to_pickle(bytes_buffer)
bucket.put_object("df.pkl", bytes_buffer.getvalue())
Loading