From 36b3c425fbe597d5960bd9076432377a616a9fad Mon Sep 17 00:00:00 2001 From: Adnan Shroufi Date: Fri, 21 Jun 2024 11:19:26 +0100 Subject: [PATCH] updated cqc script finished --- .../workflow/01_upload_cqc_data_from_api.R | 297 ++++++------------ 1 file changed, 104 insertions(+), 193 deletions(-) diff --git a/data-raw/workflow/01_upload_cqc_data_from_api.R b/data-raw/workflow/01_upload_cqc_data_from_api.R index c0c1f46..7c1c435 100644 --- a/data-raw/workflow/01_upload_cqc_data_from_api.R +++ b/data-raw/workflow/01_upload_cqc_data_from_api.R @@ -4,6 +4,9 @@ # Get cqc primary key from environ file key = Sys.getenv("CQC_PRIMARY_KEY") +# Get current year_month +download_date <- as.integer(format(today(), "%Y%m%d")) + # Define cqc columns of interest cqc_cols = c( 'name', @@ -22,12 +25,26 @@ cqc_cols = c( 'postalAddressTownCity', 'postalAddressCounty', 'numberOfBeds', - 'gacServicesTypes', - 'regulatedActivities', + 'gacServiceTypes', 'specialisms', - 'current_ratings', + 'currentRatings', 'odsCode' -) + ) + +# Define provider cols +provider_cols = c( + "providerId", + "name", + "postalAddressLine1", + "postalAddressLine2", + "postalAddressTownCity", + "postalAddressCounty", + "region", + "postalCode", + "uprn", + "companiesHouseNumber", + "lastInspection" + ) # Function to get api content from url get_api_content <- function(url){ @@ -89,11 +106,34 @@ get_location_info_by_id <- function(loc_num) { # Get data data = get_api_content(url) + + # Filter data + filtered_data = data[names(data) %in% cqc_cols] + + # Flat data + flat_data = filtered_data %>% + unlist() %>% + bind_rows() %>% + janitor::clean_names() + + # Return data + return(flat_data) +} + +# Function to query cqc api +get_provider_info_by_id = function(loc_num){ + + # Paste location url with location_id + url = paste0( + "https://api.service.cqc.org.uk/public/v1/providers/", + provider_vec[loc_num] + ) - print(data) + # Get data + data = get_api_content(url) # Filter data - filtered_data = data[names(data) %in% cqc_cols] + filtered_data = data[names(data) %in% provider_cols] # Flat data flat_data = filtered_data %>% @@ -105,7 +145,16 @@ get_location_info_by_id <- function(loc_num) { return(flat_data) } -# Generate Output -------------------------------------------------------------- +# Concatenate all columns (uniquely and without NA) with same prefix +concatenate_by_prefix = function(df, prefix){ + df %>% + mutate({{ prefix }} := pmap_chr( + select(., starts_with(prefix)), + ~ paste(unique(na.omit(c(...))), collapse = "|") + )) +} + +# Generate location output ----------------------------------------------------- # Get total pages total_pages = get_number_of_pages() @@ -145,130 +194,26 @@ parallel::clusterExport( envir = environment() ) -# Generate cqc details -Sys.time() +# Generate cqc details: ~25 mins cqc_data <- parallel::parLapply( cl = clust, X = 1:length(location_vec), fun = get_location_info_by_id ) -Sys.time() # Stop Cluster parallel::stopCluster(clust) +# Format location output ------------------------------------------------------- - - -data$uprn - -a = get_location_info_by_id(48) - -cbind( - data["name"], - data$specialisms, - data$regulatedActivities, - data["locationId"] -) - - -names(data) - - - - - - - - - -# Function to query cqc api -get_cqc_api_location_data <- function(loc_num) { - - # Paste location url with location_id - url = paste0( - "https://api.cqc.org.uk/public/v1/locations/", - location_vec[loc_num] - ) - - # Get data - data = get_api_content(url) %>% - unlist() %>% - bind_rows() - - while (ncol(data) <= 2) { - Sys.sleep(0.05) - - data = get_api_content(url) %>% - unlist() %>% - bind_rows() - } - - - # Return data - return(data) -} - -# Generate appropriate number of cores -n_cores <- parallel::detectCores() - 2 - -# Set up parallel -clust <- parallel::makeCluster(n_cores) - -# Export libraries to cluster -parallel::clusterEvalQ( - cl = clust, - { - library(dplyr); - library(httr); - library(jsonlite); - } -) - -# Export required objects to cluster -parallel::clusterExport( - cl = clust, - varlist = c( - "get_api_content", - "get_cqc_api_location_data", - "location_vec" - ), - envir = environment() -) - -# Print script update -print("Now downloading CQC API location data ...") - -# Generate cqc details -cqc_details <- parallel::parLapply( - cl = clust, - X = 1:length(location_vec), - fun = get_cqc_api_location_data -) - -# Stop Cluster -parallel::stopCluster(clust) - -cqc_details_check <- cqc_details %>% map(\(x) x$locationId) -cqc_details_check <- cqc_details_check[is.na(cqc_details_check)] -num_missing_locations <- length(cqc_details_check) - -if (num_missing_locations > 0) stop("Missing locations, probably due to timeout!") - -# Get current year_month -download_date <- as.integer(format(today(), "%Y%m%d")) - -# Bind all dfs together and apply some transformations -cqc_details_df <- cqc_details %>% +# Bind all dfs together and clean column names +cqc_details_df <- cqc_data %>% bind_rows() %>% janitor::clean_names() %>% - unite_to_plural( - specialisms, - regulated_activities_names, - current_ratings_overall_key_question_ratings_names, - current_ratings_overall_key_question_ratings_ratings, - gac_service_types_names - ) %>% + concatenate_by_prefix(., 'specialisms') %>% + concatenate_by_prefix('current_ratings_overall_key_question_ratings_name') %>% + concatenate_by_prefix('current_ratings_overall_key_question_ratings_rating') %>% + concatenate_by_prefix('gac_service_types_name') %>% tidyr::unite( single_line_address, c( @@ -291,25 +236,25 @@ cqc_details_df <- cqc_details %>% single_line_address, postcode = toupper(gsub("[^[:alnum:]]", "", postal_code)), nursing_home_flag = as.integer(grepl( - "Nursing home", gac_service_types_names + "Nursing home", gac_service_types_name )), residential_home_flag = as.integer(grepl( - "Residential home", gac_service_types_names + "Residential home", gac_service_types_name )), # type, number_of_beds = as.integer(number_of_beds), current_rating = current_ratings_overall_rating, - key_question_names = current_ratings_overall_key_question_ratings_names, - key_question_ratings = current_ratings_overall_key_question_ratings_ratings, + key_question_names = current_ratings_overall_key_question_ratings_name, + key_question_ratings = current_ratings_overall_key_question_ratings_rating, cqc_date = download_date, ods_code, specialisms, - regulated_activities_names, - gac_service_types = gac_service_types_names, + gac_service_types = gac_service_types_name, .keep = "none" - ) + ) %>% + addressMatchR::tidy_single_line_address(single_line_address) -gc() +# Generate provider output ----------------------------------------------------- # Get provider id vec provider_vec = cqc_details_df %>% @@ -317,37 +262,8 @@ provider_vec = cqc_details_df %>% distinct() %>% pull() -# Function to query cqc api -get_cqc_api_provider_data = function(loc_num){ - - # Wait - Sys.sleep(0.05) - - # Paste location url with location_id - url = paste0( - "https://api.cqc.org.uk/public/v1/providers/", - provider_vec[loc_num] - ) - - # Get data - data = get_api_content(url) %>% - unlist() %>% - bind_rows() - - while (ncol(data) <= 2) { - Sys.sleep(0.05) - - data = get_api_content(url) %>% - unlist() %>% - bind_rows() - } - - # Return data - return(data) -} - # Generate appropriate number of cores -n_cores <- parallel::detectCores() - 2 +n_cores <- parallel::detectCores() - 1 # Set up parallel clust <- parallel::makeCluster(n_cores) @@ -357,6 +273,7 @@ parallel::clusterEvalQ( cl = clust, { library(dplyr); + library(janitor); library(httr); library(jsonlite); } @@ -367,46 +284,42 @@ parallel::clusterExport( cl = clust, varlist = c( "get_api_content", - "get_cqc_api_provider_data", - "provider_vec" + "provider_vec", + "key", + "provider_cols" ), envir = environment() ) -# Print script update -print("Now downloading CQC API provider data ...") - -# Generate cqc details -cqc_details <- parallel::parLapply( +# Generate provider details: ~15 mins +provider_data <- parallel::parLapply( cl = clust, X = 1:length(provider_vec), - fun = get_cqc_api_provider_data + fun = get_provider_info_by_id ) # Stop Cluster parallel::stopCluster(clust) -cqc_details_check <- cqc_details %>% map(\(x) x$locationId) -cqc_details_check <- cqc_details_check[is.na(cqc_details_check)] -num_missing_providers <- length(cqc_details_check) - -if (num_missing_providers > 0) stop("Missing providers, probably due to timeout!") +# Process provider output ------------------------------------------------------ # Process the provider data -cqc_providers_df = cqc_details %>% +cqc_providers_df = provider_data %>% bind_rows() %>% janitor::clean_names() %>% - mutate( - # Paste fields together to create single line address - provider_sla = toupper(paste( - ifelse(is.na(name), "", name), - ifelse(is.na(postal_address_line1), "", postal_address_line1), - ifelse(is.na(postal_address_town_city), "", postal_address_town_city), - ifelse(is.na(region), "", region) - )), - # Postcode Cleaning - postal_code = toupper(gsub("[^[:alnum:]]", "", postal_code)) + tidyr::unite( + provider_sla, + c( + name, + postal_address_line1, + postal_address_line2, + postal_address_town_city, + postal_address_county + ), + sep = " ", + na.rm = TRUE ) %>% + mutate(postal_code = toupper(gsub("[^[:alnum:]]", "", postal_code))) %>% select( provider_id, provider_uprn = uprn, @@ -414,21 +327,18 @@ cqc_providers_df = cqc_details %>% provider_sla, provider_postcode = postal_code, provider_last_inspection_date = last_inspection_date - ) + ) %>% + addressMatchR::tidy_single_line_address(provider_sla) + +# Join location and providder data then save ----------------------------------- # Process the cqc df output, in preparation for matching -cqc_process_df <- cqc_details_df %>% +cqc_final_df <- cqc_details_df %>% left_join(cqc_providers_df, by = "provider_id") %>% - addressMatchR::tidy_single_line_address(single_line_address) %>% - addressMatchR::tidy_single_line_address(provider_sla) %>% rename_with(toupper) -# Check na count per column -print("NA count per column") -print(colSums(is.na(cqc_process_df))) - # Create table name -table_name <- paste0("INT646_CQC_", download_date) +table_name <- paste0("CQC_BASE_", download_date) # Set up connection to the DB con <- nhsbsaR::con_nhsbsa(database = "DALP") @@ -436,9 +346,10 @@ con <- nhsbsaR::con_nhsbsa(database = "DALP") # Drop table if it exists already drop_table_if_exists_db(table_name) -# Upload to DB... -cqc_process_df %>% write_table_long_chars(con, table_name) -# ...and add indexes +# Upload to DB +cqc_final_df %>% write_table_long_chars(con, table_name) + +# Add indexes con %>% add_indexes(table_name, c("UPRN", "POSTCODE")) # Grant access