Skip to content

Commit

Permalink
updating with summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
rqthomas committed Nov 17, 2023
1 parent 822cfb0 commit 6a68c62
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 46 deletions.
41 changes: 41 additions & 0 deletions R/build_summaries.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
config <- yaml::read_yaml("../challenge_configuration.yaml")

s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket,"/catalog/forecasts/project_id=", config$project_id),
endpoint_override = config$endpoint,
access_key = Sys.getenv("OSN_KEY"),
secret_key = Sys.getenv("OSN_SECRET"))

inventory_df <- arrow::open_dataset(s3_inventory) |> dplyr::collect()


df <- inventory_df |> dplyr::distinct(duration, model_id, variable, project_id, path, endpoint)


s3 <- arrow::s3_bucket(config$forecasts_bucket,
endpoint_override = config$endpoint,
access_key = Sys.getenv("OSN_KEY"),
secret_key = Sys.getenv("OSN_SECRET"))


for(i in 1:nrow(df)){

print(i)

arrow::open_dataset(paste0("s3://anonymous@",df$path[i],"/model_id=",df$model_id[i],"?endpoint_override=",df$endpoint[i])) |>
dplyr::mutate(model_id = df$model_id[i],
variable = df$variable[i],
duration = df$duration[i],
project_id = df$project_id[i]) |>
dplyr::collect() |>
dplyr::summarise(prediction = mean(prediction), .by = dplyr::any_of(c("site_id", "datetime", "reference_datetime", "family", "depth_m", "duration", "model_id",
"parameter", "pub_datetime", "reference_date", "variable", "project_id"))) |>
score4cast::summarize_forecast(extra_groups = c("duration", "project_id", "depth_m")) |>
dplyr::mutate(reference_date = lubridate::as_date(reference_datetime)) |>
arrow::write_dataset(s3$path("summaries"), format = 'parquet',
partitioning = c("project_id",
"duration",
"variable",
"model_id",
"reference_date"))

}
1 change: 1 addition & 0 deletions R/rebuild_inventory.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ inventory_df <- arrow::open_dataset(s3) |>
collect() |>
mutate(path = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}"),
path_full = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}/model_id={model_id}/reference_date={reference_date}/part-0.parquet"),
path_summaries = glue::glue("{bucket}/summaries/project_id={project_id}/duration={duration}/variable={variable}/model_id={model_id}/reference_date={reference_date}/part-0.parquet"),
endpoint =config$endpoint)


Expand Down
4 changes: 3 additions & 1 deletion dashboard/cache.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ install_mc()
mc_alias_set("mc_bucket", endpoint = config$endpoint,
access_key = "", secret_key = "")

mc(paste0("mirror --overwrite mc_bucket/",config$scores_bucket,"/parquet/project_id=", config$project_id," cache/"))
mc(paste0("mirror --overwrite mc_bucket/",config$scores_bucket,"/parquet/project_id=", config$project_id," cache/scores"))

mc(paste0("mirror --overwrite mc_bucket/",config$forecasts_bucket,"/summaries/project_id=", config$project_id," cache/summaries"))
88 changes: 48 additions & 40 deletions dashboard/performance.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,23 @@ This page visualizes the forecasts and forecast performance for the focal target
## Most recent forecasts {#sec-performance}

```{r}
cutoff <- as.character(Sys.Date() - 30)
combined <- arrow::open_dataset("../cache/duration=P1D") |>
filter(date >= cutoff) |> collect()
reference_datetimes <- arrow::open_dataset("../cache/summaries/duration=P1D") |>
group_by(variable) |>
dplyr::summarize(reference_datetime_max = max(reference_datetime)) |>
dplyr::collect()
cutoff <- as.character(Sys.Date() - 30)
config <- yaml::read_yaml("../challenge_configuration.yaml")
sites <- readr::read_csv(paste0("../", config$site_table), show_col_types = FALSE)
df <- combined |>
df <- arrow::open_dataset("../cache/summaries/duration=P1D") |>
left_join(reference_datetimes, by = "variable") |>
filter(reference_datetime == reference_datetime_max) |>
left_join(sites, by = "site_id") |>
filter(site_id %in% sites$site_id) |>
mutate(reference_datetime = lubridate::as_datetime(reference_datetime),
Expand All @@ -53,10 +59,10 @@ Forecasts submitted on `r reference_date`
if("Chla_ugL_mean" %in% unique(df$variable)){
df |>
filter(lubridate::as_date(reference_datetime) > max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("Chla_ugL_mean"),
((depth_m == 1.6 & site_id == "fcre") | (depth_m == 1.5 & site_id == "bvre"))) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
}
```
Expand All @@ -68,10 +74,10 @@ df |>
if("Bloom_binary_mean" %in% unique(df$variable)){
ggobj_df <- df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("Bloom_binary_mean"),
((depth_m == 1.6 & site_id == "fcre") | (depth_m == 1.5 & site_id == "bvre")))
((depth_m == 1.6 & site_id == "fcre") | (depth_m == 1.5 & site_id == "bvre"))) |>
mutate(observation = as.numeric(NA))
if(nrow(ggobj_df) > 0){
Expand Down Expand Up @@ -100,20 +106,19 @@ girafe(ggobj = ggobj,

```{r}
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(2),
variable == c("Temp_C_mean"),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime)) |>
#((depth_m == 1.6 & site_id == "fcre") | (depth_m == 1.5 & site_id == "bvre"))) |>
filter(variable == c("Temp_C_mean"),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime)) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
```

### Air temperature

```{r}
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("AirTemp_C_mean")) |>
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("AirTemp_C_mean")) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
```

Expand All @@ -124,9 +129,9 @@ df |>
if("fDOM_QSU_mean" %in% unique(df$variable)){
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("fDOM_QSU_mean")) |>
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("fDOM_QSU_mean")) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
}
```
Expand All @@ -136,11 +141,10 @@ df |>
```{r}
if("Secchi_m_sample" %in% unique(df$variable)){
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("Secchi_m_sample")) |>
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("Secchi_m_sample")) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
}
Expand All @@ -152,9 +156,9 @@ df |>
if("IceCover_binary_sum" %in% unique(df$variable)){
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("IceCover_binary_sum")) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
}
```
Expand All @@ -164,9 +168,9 @@ df |>
```{r}
if("CH4_umolL_sample" %in% unique(df$variable)){
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("CH4_umolL_sample")) |>
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("CH4_umolL_sample")) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
}
```
Expand All @@ -177,9 +181,9 @@ df |>
```{r}
if("Flow_cms_mean" %in% unique(df$variable)){
df |>
filter(lubridate::as_date(reference_datetime) == max(lubridate::as_date(df$reference_datetime)) - lubridate::days(1),
lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("Flow_cms_mean")) |>
filter(lubridate::as_date(datetime) > lubridate::as_date(reference_datetime),
variable == c("Flow_cms_mean")) |>
mutate(observation = as.numeric(NA)) |>
forecast_plots()
}
```
Expand All @@ -189,21 +193,25 @@ df |>
## Forecast analysis

```{r}
sites <- combined |> distinct(site_id) |> collect() |> slice_head(n= 6) |> pull(site_id)
## with at least n observations to compare!
df <- arrow::open_dataset("../cache/scores/duration=P1D") |>
left_join(sites, by = "site_id") |>
filter(site_id %in% sites$site_id) |>
mutate(reference_datetime = lubridate::as_datetime(reference_datetime),
datetime = lubridate::as_datetime(datetime)) |>
collect()
ref <- Sys.Date() - lubridate::days(10)
ref <- Sys.Date() - lubridate::days(30)
ref <- max(c(ref,
min(lubridate::as_date(combined$reference_datetime)),
min(lubridate::as_date(df$reference_datetime)),
lubridate::as_date("2023-10-14")))
#n_data <- 10
#who <- combined |> filter(!is.na(observation)) |> summarise(has_data = max(reference_datetime)) |> collect()
#ref <- as.character ( as.Date(who$has_data[[1]]) - n_data )
ex <- combined |>
ex <- df |>
mutate(reference_date = lubridate::as_date(reference_datetime)) |>
filter(reference_date == ref, site_id %in% sites) |> collect()
filter(reference_date == ref)
```

Expand Down Expand Up @@ -255,18 +263,18 @@ Scores are averaged across all submissions of the model with a given horizon or
## Chlorophyll-a

```{r}
leaderboard_plots(combined, "Chla_ugL_mean")
leaderboard_plots(df, "Chla_ugL_mean")
```

## Water temperature

```{r}
leaderboard_plots(combined, "Temp_C_mean")
leaderboard_plots(df, "Temp_C_mean")
```

## Air temperature

```{r}
leaderboard_plots(combined, "AirTemp_C_mean")
leaderboard_plots(df, "AirTemp_C_mean")
```
:::
11 changes: 6 additions & 5 deletions scoring/scoring.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ library(score4cast)
library(arrow)

past_days <- 365
n_cores <- 8
n_cores <- 4

setwd(here::here())

Expand Down Expand Up @@ -33,7 +33,10 @@ variable_duration <- arrow::open_dataset(s3_inv) |>
dplyr::distinct(variable, duration, project_id) |>
dplyr::collect()

future::plan("future::sequential", workers = n_cores)
future::plan("future::multisession", workers = n_cores)

#future::plan("future::sequential", workers = n_cores)


furrr::future_walk(1:nrow(variable_duration), function(k, variable_duration, config, endpoint){

Expand Down Expand Up @@ -62,7 +65,7 @@ furrr::future_walk(1:nrow(variable_duration), function(k, variable_duration, con
arrow::write_csv_arrow(prov, local_prov)
}

prov_df <- readr::read_csv(local_prov, col_types = "c")
prov_df <- readr::read_csv(local_prov, show_col_types = FALSE)

s3_scores_path <- s3_scores$path(glue::glue("parquet/project_id={project_id}/duration={duration}/variable={variable}"))

Expand Down Expand Up @@ -115,8 +118,6 @@ furrr::future_walk(1:nrow(variable_duration), function(k, variable_duration, con

if (!(score4cast:::prov_has(id, prov_df, "new_id"))){

print(group)

reference_dates <- unlist(stringr::str_split(group$reference_date, ","))

ref_upper <- (lubridate::as_date(ref)+lubridate::days(1))
Expand Down

0 comments on commit 6a68c62

Please sign in to comment.