-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrscript_get_cwdx.R
86 lines (74 loc) · 2.85 KB
/
rscript_get_cwdx.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)
# args <- c(681, 7200)
library(dplyr)
library(purrr)
library(tidyr)
library(magrittr)
library(multidplyr)
library(broom)
library(rlang)
library(lubridate)
library(extRemes)
source("R/get_cwdx_byilon.R")
##------------------------------------------------------------------------
## split it up into chunks (total number of chunks provided by argument 2)
##------------------------------------------------------------------------
nchunk <- as.integer(args[2]) # 1000 # make sure this is consistent with the number of parallel jobs (job array!) in the submission script
nlon <- 7200
nrows_chunk <- ceiling(nlon/nchunk)
vec_ilon <- seq(1:nlon)
irow_chunk <- split(vec_ilon, ceiling(seq_along(vec_ilon)/nrows_chunk))
print("getting data for longitude indices:")
print(irow_chunk[[as.integer(args[1])]])
## get all available cores
ncores <- parallel::detectCores()
if (ncores > 1){
cl <- multidplyr::new_cluster(ncores) %>%
multidplyr::cluster_library(c("dplyr", "purrr", "tidyr", "dplyr", "magrittr", "extRemes", "lubridate", "rlang", "broom")) %>%
multidplyr::cluster_assign(get_cwdx_byilon = get_cwdx_byilon)
## distribute to cores, making sure all data from a specific site is sent to the same core
df_out <- tibble(ilon = irow_chunk[[as.integer(args[1])]]) %>%
multidplyr::partition(cl) %>%
dplyr::mutate(out = purrr::map( ilon,
~try(get_cwdx_byilon(.))))
} else {
## testing
df_out <- purrr::map(as.list(irow_chunk[[as.integer(args[1])]]), ~try(get_cwdx_byilon(.)))
}
# ##------------------------------------------------------------------------
# ## second round
# ##------------------------------------------------------------------------
# load("data/df_file_availability.RData")
# vec_ilon <- df %>%
# dplyr::filter(!avl_cwdx) %>%
# pull(ilon)
#
# nchunk <- as.integer(args[2])
# nrows_chunk <- ceiling(length(vec_ilon)/nchunk)
# irow_chunk <- split(vec_ilon, ceiling(seq_along(vec_ilon)/nrows_chunk))
#
# print("getting data for longitude indices:")
# print(irow_chunk[[as.integer(args[1])]])
#
# ## get all available cores
# ncores <- parallel::detectCores()
#
# if (ncores > 1){
#
# cl <- multidplyr::new_cluster(ncores) %>%
# multidplyr::cluster_library(c("dplyr", "purrr", "tidyr", "dplyr", "magrittr", "extRemes", "lubridate", "rlang", "broom")) %>%
# multidplyr::cluster_assign(get_cwdx_byilon = get_cwdx_byilon)
#
# ## distribute to cores, making sure all data from a specific site is sent to the same core
# df_out <- tibble(ilon = irow_chunk[[as.integer(args[1])]]) %>%
# multidplyr::partition(cl) %>%
# dplyr::mutate(out = purrr::map( ilon,
# ~try(get_cwdx_byilon(.))))
#
# } else {
#
# ## testing
# df_out <- purrr::map(as.list(irow_chunk[[as.integer(args[1])]]), ~try(get_cwdx_byilon(.)))
#
# }