diff --git a/.github/workflows/build-and-run-model.yaml b/.github/workflows/build-and-run-model.yaml index 50a625cf..145e4786 100644 --- a/.github/workflows/build-and-run-model.yaml +++ b/.github/workflows/build-and-run-model.yaml @@ -69,11 +69,12 @@ jobs: # required in order to allow the reusable called workflow to push to # GitHub Container Registry packages: write - uses: ccao-data/actions/.github/workflows/build-and-run-batch-job.yaml@main + uses: ccao-data/actions/.github/workflows/build-and-run-batch-job.yaml@jeancochrane/spike-c5-instances-for-build-and-run-batch-job with: + ref: jeancochrane/spike-c5-instances-for-build-and-run-batch-job backend: "ec2" - vcpu: "40" - memory: "158000" + vcpu: "90" + memory: "180000" # Maximum pipeline runtime. This is slightly below 6 hours, which # is the maximum length of any single GitHub Actions job role-duration-seconds: 21000 diff --git a/Dockerfile b/Dockerfile index ae5cb469..839a4349 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,41 +1,15 @@ -FROM rocker/r-ver:4.3.2 - -# Set the working directory to setup. Uses a dedicated directory instead of -# root since otherwise renv will try to scan every subdirectory -WORKDIR /setup - -# Use PPM for binary installs -ENV RENV_CONFIG_REPOS_OVERRIDE "https://packagemanager.posit.co/cran/__linux__/jammy/latest" -ENV RENV_CONFIG_SANDBOX_ENABLED FALSE -ENV RENV_PATHS_LIBRARY renv/library -ENV RENV_PATHS_CACHE /setup/cache +FROM pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime # Install system dependencies RUN apt-get update && \ apt-get install --no-install-recommends -y \ - libcurl4-openssl-dev libssl-dev libxml2-dev libgit2-dev git \ - libudunits2-dev python3-dev python3-pip python3-venv libgdal-dev \ - libgeos-dev libproj-dev libfontconfig1-dev libharfbuzz-dev \ - libfribidi-dev pandoc curl gdebi-core && \ - rm -rf /var/lib/apt/lists/* - -# Install Quarto -RUN curl -o quarto-linux-amd64.deb -L \ - https://github.com/quarto-dev/quarto-cli/releases/download/v1.3.450/quarto-1.3.450-linux-amd64.deb -RUN gdebi -n quarto-linux-amd64.deb + libx11-dev -# Install pipeline Python dependencies globally -RUN pip install --no-cache-dir dvc[s3] +# Copy Python requirements file into the image +COPY requirements.txt ./ -# Copy R bootstrap files into the image -COPY renv.lock .Rprofile DESCRIPTION requirements.txt ./ -COPY renv/profiles/reporting/renv.lock reporting-renv.lock -COPY renv/ renv/ - -# Install R dependencies. Restoring renv first ensures that it's -# using the same version as recorded in the lockfile -RUN Rscript -e 'renv::restore(packages = "renv"); renv::restore()' -RUN Rscript -e 'renv::restore(lockfile = "reporting-renv.lock")' +# Install Python requirements +RUN pip install -U -r requirements.txt # Set the working directory to the model directory WORKDIR /model-res-avm/ @@ -43,8 +17,5 @@ WORKDIR /model-res-avm/ # Copy the directory into the container COPY ./ . -# Copy R dependencies into the model directory -RUN rm -Rf /model-res-avm/renv && \ - mv /setup/renv /model-res-avm/renv - -CMD dvc pull && dvc repro +# Run comps algorithm +CMD python3 python/comps.py diff --git a/python/comps.py b/python/comps.py index bfaacb29..75f2d27b 100644 --- a/python/comps.py +++ b/python/comps.py @@ -1,6 +1,12 @@ +import typing + import numpy as np import numba as nb import pandas as pd +import taichi as ti + +# Initialize taichi +ti.init(arch=ti.cpu, default_ip=ti.i32, default_fp=ti.f32) def get_comps( @@ -91,11 +97,30 @@ def get_comps( # Place observations in bins. Do this in a numba-accelerated function so # that we can make use of fast loops - observation_df["price_bin"] = _bin_by_price( - observation_df[["id", "predicted_value"]].values, - price_bin_indices.values + observation_matrix = observation_df[["id", "predicted_value"]].values + taichi_obs_ndarray = ti.ndarray(dtype=int, shape=observation_matrix.shape) + taichi_obs_ndarray.from_numpy(observation_matrix) + + price_bin_matrix = price_bin_indices.values + taichi_bin_ndarray = ti.ndarray(dtype=int, shape=price_bin_matrix.shape) + taichi_bin_ndarray.from_numpy(price_bin_matrix) + + num_observations = observation_matrix.shape[0] + num_price_bins = price_bin_matrix.shape[0] + + # Output vector + binned_vector = ti.ndarray(dtype=int, shape=(num_observations, 1)) + + _bin_by_price( + taichi_obs_ndarray, + taichi_bin_ndarray, + binned_vector, + num_observations, + num_price_bins ) + observation_df["price_bin"] = binned_vector.to_numpy() + total_num_observations = len(observation_df) total_num_possible_comps = len(sorted_comparison_df) binned_ids, binned_scores = [], [] @@ -150,14 +175,40 @@ def get_comps( flush=True ) - comp_ids, comp_scores = _get_top_n_comps( - observation_matrix, possible_comp_matrix, weights_matrix, num_comps + num_observations = len(observation_matrix) + num_possible_comparisons = len(possible_comp_matrix) + + # Store scores and indexes in two separate arrays rather than a 3d matrix + # for simplicity (array of tuples does not convert to pandas properly). + comp_ids = ti.ndarray(dtype=int, shape=(num_observations, num_comps)) + comp_scores = ti.ndarray(dtype=float, shape=(num_observations, num_comps)) + + # Indexes default to -1, which is an impossible index and so is a signal + # that no comp was found + comp_ids.fill(-1) + + num_trees = observation_matrix.shape[1] + + _get_top_n_comps( + observation_matrix, + possible_comp_matrix, + weights_matrix, + comp_ids, + comp_scores, + num_comps, + num_observations, + num_possible_comparisons, + num_trees, ) # Match comp and observation IDs back to the original dataframes since # we have since rearranged them + comp_ids = comp_ids.to_numpy() + comp_scores = comp_scores.to_numpy() + matched_comp_ids = np.vectorize(comp_idx_to_id.get)(comp_ids) observation_ids = observations["id"].values + for obs_idx, comp_idx, comp_score in zip( observation_ids, matched_comp_ids, comp_scores ): @@ -193,72 +244,69 @@ def get_comps( return indexes_df, scores_df -@nb.njit(fastmath=True, parallel=True) -def _bin_by_price(observation_matrix, price_bin_matrix): +@ti.kernel +def _bin_by_price( + observation_matrix: ti.types.ndarray(), + price_bin_matrix: ti.types.ndarray(), + output_vector: ti.types.ndarray(), + num_observations: int, + num_price_bins: int +): """Given a matrix of observations and a matrix of price bins, place the observations in the closest price bin and return an array of bin IDs with the same length as the observation matrix.""" - num_observations = len(observation_matrix) price_bin_idx, price_bin_min_idx, price_bin_max_idx = 0, 3, 4 observation_price_idx = 1 - output_matrix = np.zeros(num_observations, dtype=np.int32) - for obs_idx in nb.prange(num_observations): - observation = observation_matrix[obs_idx] - observation_price = observation[observation_price_idx] - for bin in price_bin_matrix: + for obs_idx in range(num_observations): + observation_price = observation_matrix[obs_idx, observation_price_idx] + bin_found = False + for bin_idx in range(num_price_bins): + bin_min = price_bin_matrix[bin_idx, price_bin_min_idx] + bin_max = price_bin_matrix[bin_idx, price_bin_max_idx] + bin_id = price_bin_matrix[bin_idx, price_bin_idx] + if ( # Since we expect the price bins to be non-overlapping with # no gaps and an integer difference of 1 between ranges, the # ranges can be treated as inclusive on both ends - observation_price >= bin[price_bin_min_idx] and - observation_price <= bin[price_bin_max_idx] + observation_price >= bin_min and observation_price <= bin_max ): - output_matrix[obs_idx] = bin[price_bin_idx] + output_vector[obs_idx, 0] = bin_id + bin_found = True break - else: - raise ValueError( - f"Observation {obs_idx} did not match any price bins" - ) - return output_matrix + if not bin_found: + # Set a special value to indicate an error, since taichi doesn't + # support runtime errors + output_vector[obs_idx, 0] = -1 -@nb.njit(fastmath=True, parallel=True) +@ti.kernel def _get_top_n_comps( - leaf_node_matrix, comparison_leaf_node_matrix, weights_matrix, num_comps + leaf_node_matrix: ti.types.ndarray(), + comparison_leaf_node_matrix: ti.types.ndarray(), + weights_matrix: ti.types.ndarray(), + all_top_n_idxs: ti.types.ndarray(), + all_top_n_scores: ti.types.ndarray(), + num_comps: int, + num_observations: int, + num_possible_comparisons: int, + num_trees: int ): """Helper function that takes matrices of leaf node assignments for observations in a tree model, a matrix of weights for each obs/tree, and an integer `num_comps`, and returns a matrix where each observation is scored by similarity to observations in the comparison matrix and the top N scores are returned along with the indexes of the comparison observations.""" - num_observations = len(leaf_node_matrix) - num_possible_comparisons = len(comparison_leaf_node_matrix) - idx_dtype = np.int32 - score_dtype = np.float32 - - # Store scores and indexes in two separate arrays rather than a 3d matrix - # for simplicity (array of tuples does not convert to pandas properly). - # Indexes default to -1, which is an impossible index and so is a signal - # that no comp was found - all_top_n_idxs = np.full((num_observations, num_comps), -1, dtype=idx_dtype) - all_top_n_scores = np.zeros((num_observations, num_comps), dtype=score_dtype) - - for x_i in nb.prange(num_observations): - top_n_idxs = np.full(num_comps, -1, dtype=idx_dtype) - top_n_scores = np.zeros(num_comps, dtype=score_dtype) - - # TODO: We could probably speed this up by skipping comparisons we've - # already made; we just need to do it in a way that will have a - # low memory footprint + for x_i in range(num_observations): for y_i in range(num_possible_comparisons): similarity_score = 0.0 - for tree_idx in range(len(leaf_node_matrix[x_i])): + for tree_idx in range(num_trees): similarity_score += ( - weights_matrix[x_i][tree_idx] * ( - leaf_node_matrix[x_i][tree_idx] == - comparison_leaf_node_matrix[y_i][tree_idx] + weights_matrix[x_i, tree_idx] * ( + leaf_node_matrix[x_i, tree_idx] == + comparison_leaf_node_matrix[y_i, tree_idx] ) ) @@ -266,30 +314,32 @@ def _get_top_n_comps( # comps, and store it in the sorted comps array if it is. # First check if the score is higher than the lowest score, # since otherwise we don't need to bother iterating the scores - if similarity_score > top_n_scores[-1]: - for idx, score in enumerate(top_n_scores): - if similarity_score > score: - top_n_idxs = _insert_at_idx_and_shift( - top_n_idxs, y_i, idx - ) - top_n_scores = _insert_at_idx_and_shift( - top_n_scores, similarity_score, idx - ) - break - - all_top_n_idxs[x_i] = top_n_idxs - all_top_n_scores[x_i] = top_n_scores - - return all_top_n_idxs, all_top_n_scores - - -@nb.njit(fastmath=True) -def _insert_at_idx_and_shift(arr, elem, idx): - """Helper function to insert an element `elem` into a sorted numpy array `arr` - at a given index `idx` and shift the subsequent elements down one index.""" - return np.concatenate(( - arr[:idx], np.array([(elem)], dtype=arr.dtype), arr[idx:-1] - )) + if similarity_score > all_top_n_scores[x_i, num_comps - 1]: + for idx in range(num_comps): + if similarity_score > all_top_n_scores[x_i, idx]: + # Shift scores and indices to make room for the new one. + # This requires iterating the indices backwards; since + # taichi doesn't support the `step` parameter in `range()` + # calls the way that Python does, we need to recreate + # it with other primitives + for i in range(num_comps - 1, idx): + all_top_n_scores[x_i, i] = all_top_n_scores[x_i, i - 1] + all_top_n_idxs[x_i, i] = all_top_n_idxs[x_i, i - 1] + + # Insert the new score and index at the correct position + all_top_n_scores[x_i, idx] = similarity_score + all_top_n_idxs[x_i, idx] = y_i + break + + +@ti.func +def _insert_at_coord_and_shift(ndarr, x, y, elem, max_len): + """Helper function to insert an element `elem` into a sorted numpy array `arr` + at a given (x, y) coordinate and shift the subsequent elements down one + index, with a maximum of `max_len` elements.""" + for i in range(max_len - 1, y, -1): + ndarr[x, i] = ndarr[x, i - 1] + ndarr[x, y] = elem if __name__ == "__main__": @@ -298,8 +348,8 @@ def _insert_at_idx_and_shift(arr, elem, idx): import time num_trees = 500 - num_obs = 20001 - num_comparisons = 10000 + num_obs = 100000 + num_comparisons = 50000 mean_sale_price = 350000 std_deviation = 110000 diff --git a/requirements.txt b/requirements.txt index ac7c856e..01742a65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ python-dateutil==2.8.2 pytz==2023.3.post1 six==1.16.0 tzdata==2023.3 +taichi~=1.7.1