Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] Spike upgrading comps algorithm with taichi #236

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/build-and-run-model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ jobs:
# required in order to allow the reusable called workflow to push to
# GitHub Container Registry
packages: write
uses: ccao-data/actions/.github/workflows/build-and-run-batch-job.yaml@main
uses: ccao-data/actions/.github/workflows/build-and-run-batch-job.yaml@jeancochrane/spike-c5-instances-for-build-and-run-batch-job
with:
ref: jeancochrane/spike-c5-instances-for-build-and-run-batch-job
backend: "ec2"
vcpu: "40"
memory: "158000"
vcpu: "90"
memory: "180000"
# Maximum pipeline runtime. This is slightly below 6 hours, which
# is the maximum length of any single GitHub Actions job
role-duration-seconds: 21000
Expand Down
45 changes: 8 additions & 37 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,50 +1,21 @@
FROM rocker/r-ver:4.3.2

# Set the working directory to setup. Uses a dedicated directory instead of
# root since otherwise renv will try to scan every subdirectory
WORKDIR /setup

# Use PPM for binary installs
ENV RENV_CONFIG_REPOS_OVERRIDE "https://packagemanager.posit.co/cran/__linux__/jammy/latest"
ENV RENV_CONFIG_SANDBOX_ENABLED FALSE
ENV RENV_PATHS_LIBRARY renv/library
ENV RENV_PATHS_CACHE /setup/cache
FROM pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Installing CUDA appears to be a pain, so for the purposes of benchmarking this PR I just switched to a base image that comes with CUDA installed and refactored the Dockerfile to only install dependencies necessary for running python/comps.py.

# Install system dependencies
RUN apt-get update && \
apt-get install --no-install-recommends -y \
libcurl4-openssl-dev libssl-dev libxml2-dev libgit2-dev git \
libudunits2-dev python3-dev python3-pip python3-venv libgdal-dev \
libgeos-dev libproj-dev libfontconfig1-dev libharfbuzz-dev \
libfribidi-dev pandoc curl gdebi-core && \
rm -rf /var/lib/apt/lists/*

# Install Quarto
RUN curl -o quarto-linux-amd64.deb -L \
https://github.com/quarto-dev/quarto-cli/releases/download/v1.3.450/quarto-1.3.450-linux-amd64.deb
RUN gdebi -n quarto-linux-amd64.deb
libx11-dev

# Install pipeline Python dependencies globally
RUN pip install --no-cache-dir dvc[s3]
# Copy Python requirements file into the image
COPY requirements.txt ./

# Copy R bootstrap files into the image
COPY renv.lock .Rprofile DESCRIPTION requirements.txt ./
COPY renv/profiles/reporting/renv.lock reporting-renv.lock
COPY renv/ renv/

# Install R dependencies. Restoring renv first ensures that it's
# using the same version as recorded in the lockfile
RUN Rscript -e 'renv::restore(packages = "renv"); renv::restore()'
RUN Rscript -e 'renv::restore(lockfile = "reporting-renv.lock")'
# Install Python requirements
RUN pip install -U -r requirements.txt

# Set the working directory to the model directory
WORKDIR /model-res-avm/

# Copy the directory into the container
COPY ./ .

# Copy R dependencies into the model directory
RUN rm -Rf /model-res-avm/renv && \
mv /setup/renv /model-res-avm/renv

CMD dvc pull && dvc repro
# Run comps algorithm
CMD python3 python/comps.py
194 changes: 122 additions & 72 deletions python/comps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import typing

import numpy as np
import numba as nb
import pandas as pd
import taichi as ti

# Initialize taichi
ti.init(arch=ti.cpu, default_ip=ti.i32, default_fp=ti.f32)


def get_comps(
Expand Down Expand Up @@ -91,11 +97,30 @@ def get_comps(

# Place observations in bins. Do this in a numba-accelerated function so
# that we can make use of fast loops
observation_df["price_bin"] = _bin_by_price(
observation_df[["id", "predicted_value"]].values,
price_bin_indices.values
observation_matrix = observation_df[["id", "predicted_value"]].values
taichi_obs_ndarray = ti.ndarray(dtype=int, shape=observation_matrix.shape)
taichi_obs_ndarray.from_numpy(observation_matrix)
Comment on lines +100 to +102
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taichi cannot automatically convert numpy arrays to the data structures that it works with (called fields) so we have to do that manually prior to passing them into the kernel.


price_bin_matrix = price_bin_indices.values
taichi_bin_ndarray = ti.ndarray(dtype=int, shape=price_bin_matrix.shape)
taichi_bin_ndarray.from_numpy(price_bin_matrix)

num_observations = observation_matrix.shape[0]
num_price_bins = price_bin_matrix.shape[0]
Comment on lines +108 to +109
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shape and len attributes are not available inside taichi kernels, so the recommended approach for these types of array metadata is to compute them outside the kernel and then pass them into the kernel as arguments.


# Output vector
binned_vector = ti.ndarray(dtype=int, shape=(num_observations, 1))
Comment on lines +111 to +112
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taichi also cannot return fields from kernels, so we need to define the output data structure ahead of time and then mutate it in the kernel function.


_bin_by_price(
taichi_obs_ndarray,
taichi_bin_ndarray,
binned_vector,
num_observations,
num_price_bins
)

observation_df["price_bin"] = binned_vector.to_numpy()

total_num_observations = len(observation_df)
total_num_possible_comps = len(sorted_comparison_df)
binned_ids, binned_scores = [], []
Expand Down Expand Up @@ -150,14 +175,40 @@ def get_comps(
flush=True
)

comp_ids, comp_scores = _get_top_n_comps(
observation_matrix, possible_comp_matrix, weights_matrix, num_comps
num_observations = len(observation_matrix)
num_possible_comparisons = len(possible_comp_matrix)

# Store scores and indexes in two separate arrays rather than a 3d matrix
# for simplicity (array of tuples does not convert to pandas properly).
comp_ids = ti.ndarray(dtype=int, shape=(num_observations, num_comps))
comp_scores = ti.ndarray(dtype=float, shape=(num_observations, num_comps))

# Indexes default to -1, which is an impossible index and so is a signal
# that no comp was found
comp_ids.fill(-1)

num_trees = observation_matrix.shape[1]

_get_top_n_comps(
observation_matrix,
possible_comp_matrix,
weights_matrix,
comp_ids,
comp_scores,
num_comps,
num_observations,
num_possible_comparisons,
num_trees,
)

# Match comp and observation IDs back to the original dataframes since
# we have since rearranged them
comp_ids = comp_ids.to_numpy()
comp_scores = comp_scores.to_numpy()

matched_comp_ids = np.vectorize(comp_idx_to_id.get)(comp_ids)
observation_ids = observations["id"].values

for obs_idx, comp_idx, comp_score in zip(
observation_ids, matched_comp_ids, comp_scores
):
Expand Down Expand Up @@ -193,103 +244,102 @@ def get_comps(
return indexes_df, scores_df


@nb.njit(fastmath=True, parallel=True)
def _bin_by_price(observation_matrix, price_bin_matrix):
@ti.kernel
def _bin_by_price(
observation_matrix: ti.types.ndarray(),
price_bin_matrix: ti.types.ndarray(),
output_vector: ti.types.ndarray(),
num_observations: int,
num_price_bins: int
):
"""Given a matrix of observations and a matrix of price bins, place the
observations in the closest price bin and return an array of bin IDs
with the same length as the observation matrix."""
num_observations = len(observation_matrix)
price_bin_idx, price_bin_min_idx, price_bin_max_idx = 0, 3, 4
observation_price_idx = 1
output_matrix = np.zeros(num_observations, dtype=np.int32)

for obs_idx in nb.prange(num_observations):
observation = observation_matrix[obs_idx]
observation_price = observation[observation_price_idx]
for bin in price_bin_matrix:
for obs_idx in range(num_observations):
observation_price = observation_matrix[obs_idx, observation_price_idx]
bin_found = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for/else constructs are not supported in taichi, so we have to use a flag variable instead.

for bin_idx in range(num_price_bins):
bin_min = price_bin_matrix[bin_idx, price_bin_min_idx]
bin_max = price_bin_matrix[bin_idx, price_bin_max_idx]
bin_id = price_bin_matrix[bin_idx, price_bin_idx]

if (
# Since we expect the price bins to be non-overlapping with
# no gaps and an integer difference of 1 between ranges, the
# ranges can be treated as inclusive on both ends
observation_price >= bin[price_bin_min_idx] and
observation_price <= bin[price_bin_max_idx]
observation_price >= bin_min and observation_price <= bin_max
):
output_matrix[obs_idx] = bin[price_bin_idx]
output_vector[obs_idx, 0] = bin_id
bin_found = True
break
else:
raise ValueError(
f"Observation {obs_idx} did not match any price bins"
)

return output_matrix
if not bin_found:
# Set a special value to indicate an error, since taichi doesn't
# support runtime errors
output_vector[obs_idx, 0] = -1


@nb.njit(fastmath=True, parallel=True)
@ti.kernel
def _get_top_n_comps(
leaf_node_matrix, comparison_leaf_node_matrix, weights_matrix, num_comps
leaf_node_matrix: ti.types.ndarray(),
comparison_leaf_node_matrix: ti.types.ndarray(),
weights_matrix: ti.types.ndarray(),
all_top_n_idxs: ti.types.ndarray(),
all_top_n_scores: ti.types.ndarray(),
num_comps: int,
num_observations: int,
num_possible_comparisons: int,
num_trees: int
):
"""Helper function that takes matrices of leaf node assignments for
observations in a tree model, a matrix of weights for each obs/tree, and an
integer `num_comps`, and returns a matrix where each observation is scored
by similarity to observations in the comparison matrix and the top N scores
are returned along with the indexes of the comparison observations."""
num_observations = len(leaf_node_matrix)
num_possible_comparisons = len(comparison_leaf_node_matrix)
idx_dtype = np.int32
score_dtype = np.float32

# Store scores and indexes in two separate arrays rather than a 3d matrix
# for simplicity (array of tuples does not convert to pandas properly).
# Indexes default to -1, which is an impossible index and so is a signal
# that no comp was found
all_top_n_idxs = np.full((num_observations, num_comps), -1, dtype=idx_dtype)
all_top_n_scores = np.zeros((num_observations, num_comps), dtype=score_dtype)

for x_i in nb.prange(num_observations):
top_n_idxs = np.full(num_comps, -1, dtype=idx_dtype)
top_n_scores = np.zeros(num_comps, dtype=score_dtype)
Comment on lines -249 to -250
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrays cannot be defined inside a taichi kernel, and must instead be defined outside its scope and then passed in as an argument; hence, we needed to refactor this function to remove the dynamic allocation of arrays inside the context of the function.


# TODO: We could probably speed this up by skipping comparisons we've
# already made; we just need to do it in a way that will have a
# low memory footprint
for x_i in range(num_observations):
for y_i in range(num_possible_comparisons):
similarity_score = 0.0
for tree_idx in range(len(leaf_node_matrix[x_i])):
for tree_idx in range(num_trees):
similarity_score += (
weights_matrix[x_i][tree_idx] * (
leaf_node_matrix[x_i][tree_idx] ==
comparison_leaf_node_matrix[y_i][tree_idx]
weights_matrix[x_i, tree_idx] * (
leaf_node_matrix[x_i, tree_idx] ==
comparison_leaf_node_matrix[y_i, tree_idx]
)
)

# See if the score is higher than any of the top N
# comps, and store it in the sorted comps array if it is.
# First check if the score is higher than the lowest score,
# since otherwise we don't need to bother iterating the scores
if similarity_score > top_n_scores[-1]:
for idx, score in enumerate(top_n_scores):
if similarity_score > score:
top_n_idxs = _insert_at_idx_and_shift(
top_n_idxs, y_i, idx
)
top_n_scores = _insert_at_idx_and_shift(
top_n_scores, similarity_score, idx
)
break

all_top_n_idxs[x_i] = top_n_idxs
all_top_n_scores[x_i] = top_n_scores

return all_top_n_idxs, all_top_n_scores


@nb.njit(fastmath=True)
def _insert_at_idx_and_shift(arr, elem, idx):
"""Helper function to insert an element `elem` into a sorted numpy array `arr`
at a given index `idx` and shift the subsequent elements down one index."""
return np.concatenate((
arr[:idx], np.array([(elem)], dtype=arr.dtype), arr[idx:-1]
))
if similarity_score > all_top_n_scores[x_i, num_comps - 1]:
for idx in range(num_comps):
if similarity_score > all_top_n_scores[x_i, idx]:
# Shift scores and indices to make room for the new one.
# This requires iterating the indices backwards; since
# taichi doesn't support the `step` parameter in `range()`
# calls the way that Python does, we need to recreate
# it with other primitives
for i in range(num_comps - 1, idx):
all_top_n_scores[x_i, i] = all_top_n_scores[x_i, i - 1]
all_top_n_idxs[x_i, i] = all_top_n_idxs[x_i, i - 1]

# Insert the new score and index at the correct position
all_top_n_scores[x_i, idx] = similarity_score
all_top_n_idxs[x_i, idx] = y_i
break


@ti.func
def _insert_at_coord_and_shift(ndarr, x, y, elem, max_len):
"""Helper function to insert an element `elem` into a sorted numpy array `arr`
at a given (x, y) coordinate and shift the subsequent elements down one
index, with a maximum of `max_len` elements."""
for i in range(max_len - 1, y, -1):
ndarr[x, i] = ndarr[x, i - 1]
ndarr[x, y] = elem


if __name__ == "__main__":
Expand All @@ -298,8 +348,8 @@ def _insert_at_idx_and_shift(arr, elem, idx):
import time

num_trees = 500
num_obs = 20001
num_comparisons = 10000
num_obs = 100000
num_comparisons = 50000
mean_sale_price = 350000
std_deviation = 110000

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ python-dateutil==2.8.2
pytz==2023.3.post1
six==1.16.0
tzdata==2023.3
taichi~=1.7.1
Loading