-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
notes on xarray/MDIM #145
Comments
next increment, here write write to Zarr and then open that with GDAL multdim, read at /10 and plot library(reticulate)
xarray <- import("xarray")
pandas <- import("pandas")
ftodate <- function(x) pandas$DatetimeIndex(list(
format(as.Date(stringr::str_extract(basename(x), "[0-9]{8}"), "%Y%m%d"))
))
da <- function(x) {
ds <- xarray$open_dataset(x)
ds$expand_dims(time = ftodate(x))
}
files <- raadtools::read_par(returnfiles = TRUE)
ds <- xarray$concat(lapply(files$fullname[1:10], da), dim = "time")
ds$to_zarr(store = "/tmp/par.zarr")
gdal <- import("osgeo.gdal")
gdal$UseExceptions()
ds <- gdal$OpenEx("/tmp/par.zarr", gdal$OF_MULTIDIM_RASTER)
rg <- ds$GetRootGroup()
rg$GetMDArrayNames()
# [1] "lat" "lon" "time" "palette" "par"
par <- rg$OpenMDArray("par")
par$GetBlockSize()
par$GetDimensionCount()
lapply(par$GetDimensions(), function(.x) .x$GetSize())
b <- par$GetView("[:,:,1]")
b$GetBlockSize()
d <- b$GetDimensions()
lapply(d, function(.x) .x$GetSize())
bytes <- b$Read()
length(bytes)
##10 * 4320 * 2
#m <- matrix(readBin(bytes, "integer", n = 4320 * 10, size = 2), 10)
#ximage::ximage(m)
b <- par$GetView("[0:1,::10,::10]")
b$GetBlockSize()
d <- b$GetDimensions()
shape <- lapply(d, function(.x) .x$GetSize())
sbytes <- b$Read()
length(sbytes)
m <- matrix(readBin(sbytes, "integer", n = length(sbytes)/2, size = 2), shape[[2]] , byrow = TRUE)
ximage::ximage(m, c(-180, 180, -90, 90))
|
ReadAsArray() is giving me a honky error in the libs, so I'm unpacking the bytes above, but it should be as easy as this: lon <- rg$OpenMDArray("lon")
lon$Read() ## I don't know if we can get these from the View() defined above, probably |
ah, we can do this (but the GeoTransform is not very useful ...) cd <- b$AsClassicDataset(1L, 2L)
> cd$RasterXSize
[1] 432
> cd$RasterYSize
[1] 864
> cd$GetGeoTransform() |
If we don't use a View, the geotransform is intact, but the coordinates are still not accessible from the MDArray.
So, tighten this up and might be a few bug reports. This at least is a pretty nice way to permute dims (from multdim read to Classic) |
Works fine on GDAL autotest examples, so we might have a report-worthy example (so long as my xarray write is up to scratch, do that all in python on same system).
|
works fine here, so maybe specifically a netcdf related thing import xarray
ds = xarray.open_dataset("autotest/gdrivers/data/zarr/array_dimensions.zarr")
ds
<xarray.Dataset> Size: 7kB
Dimensions: (lat: 20, lon: 40)
Coordinates:
* lat (lat) float64 160B 4.662e-310 0.0 ... 1.128e-310 1.128e-310
* lon (lon) float64 320B 4.662e-310 0.0 ... 1.128e-310 1.128e-310
Data variables:
var (lat, lon) float64 6kB ...
ds.to_zarr("../var.zarr")
<xarray.backends.zarr.ZarrStore object at 0x14c406def940>
from osgeo import gdal
gdal.UseExceptions()
ds = gdal.OpenEx("../var.zarr", gdal.OF_MULTIDIM_RASTER)
rg = ds.GetRootGroup()
var = rg.OpenMDArray("var")
var.ReadAsArray()
array([[0.00000e+000, 0.00000e+000, 0.00000e+000, 0.00000e+000,
|
works fine for these ones ... so I still have to try on the par above where I'm munging my own time coord import xarray
unc = ["https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/202202/oisst-avhrr-v02r01.20220218.nc#mode=bytes",
"https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/202202/oisst-avhrr-v02r01.20220219.nc#mode=bytes"]
arrds = [xarray.open_dataset(x, engine = "netcdf4", chunks = {}) for x in unc]
ds = xarray.concat(arrds, dim = "time")
ds
<xarray.Dataset> Size: 33MB
Dimensions: (time: 2, zlev: 1, lat: 720, lon: 1440)
Coordinates:
* time (time) datetime64[ns] 16B 2022-02-18T12:00:00 2022-02-19T12:00:00
* zlev (zlev) float32 4B 0.0
* lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
* lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
Data variables:
sst (time, zlev, lat, lon) float32 8MB nan nan nan ... -1.8 -1.8 -1.8
anom (time, zlev, lat, lon) float32 8MB nan nan nan nan ... 0.0 0.0 0.0
err (time, zlev, lat, lon) float32 8MB nan nan nan nan ... 0.3 0.3 0.3
ice (time, zlev, lat, lon) float32 8MB nan nan nan ... 0.98 0.98 0.98
Attributes: (12/37)
Conventions: CF-1.6, ACDD-1.3
title: NOAA/NCEI 1/4 Degree Daily Optimum Interpolat...
references: Reynolds, et al.(2007) Daily High-Resolution-...
source: ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...
id: oisst-avhrr-v02r01.20220218.nc
naming_authority: gov.noaa.ncei
... ...
time_coverage_start: 2022-02-18T00:00:00Z
time_coverage_end: 2022-02-18T23:59:59Z
metadata_link: https://doi.org/10.25921/RE9P-PT57
ncei_template_version: NCEI_NetCDF_Grid_Template_v2.0
comment: Data was converted from NetCDF-3 to NetCDF-4 ...
sensor: Thermometer, AVHRR
ds.to_zarr("../oisst.zarr")
<xarray.backends.zarr.ZarrStore object at 0x14c466dc95c0>
from osgeo import gdal
gds = gdal.OpenEx("../oisst.zarr", gdal.OF_MULTIDIM_RASTER)
rg = gds.GetRootGroup()
rg.GetMDArrayNames()
['lat', 'lon', 'time', 'zlev', 'anom', 'err', 'ice', 'sst']
sst = rg.OpenMDArray("sst")
a = sst.ReadAsArray()
a.shape
(2, 1, 720, 1440) |
Here's something from the MDIM world, in python/reticulate for now #f <- "/rdsi/PUBLIC/raad/data/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198109/oisst-avhrr-v02r01.19810901.nc"
u <- "/vsicurl/https://github.com/mdsumner/fixoisst/raw/main/file.nc"
library(reticulate)
gdal <- import("osgeo.gdal")
gdal$UseExceptions()
drv <- gdal$GetDriverByName("VRT")
ds <- drv$CreateMultiDimensional("myds")
rg <-ds$GetRootGroup()
None <- reticulate::py_none()
dim0 = rg$CreateDimension("time", None, None, 1L)
dim1 = rg$CreateDimension("zlev", None, None, 1L)
dim2 = rg$CreateDimension("lat", None, None, 360L)
dim3 = rg$CreateDimension("lon", None, None, 360L)
#https://gdal.org/api/python/mdim_api.html#osgeo.gdal.MDArray.GetResampled
ids <- gdal$OpenEx(u, gdal$OF_MULTIDIM_RASTER)
rg2 <- ids$GetRootGroup()
sst <- rg2$OpenMDArray("sst")
#GetResampled(MDArray self, int nDimensions, GDALRIOResampleAlg resample_alg, OSRSpatialReferenceShadow ** srs, char ** options=None)→ MDArray
osr <- import("osgeo.osr")
crs <- osr$SpatialReference()
crs$SetFromUserInput("+proj=laea +lon_0=147")
v <- sst$GetResampled(list(dim0, dim1, dim2, dim3), gdal$GRA_Bilinear, crs)
a <- v$Read()
ximage::ximage(matrix(readBin(a, "integer", size = 2, n = 360*360, signed = F), 360, byrow = TRUE))
v$AsClassicDataset(2L, 3L)$GetGeoTransform()
[[1]]
[1] 10223952
[[2]]
[1] -56784.64
[[3]]
[1] 0
[[4]]
[1] -12230967
[[5]]
[1] 0
[[6]]
[1] 44017.02
issues obviously with nodata and scaling and the usual antimeridian stuff, and what's with not having a target extent for the output grid ?? just that only Even has used this?? ... but this is promising |
this works on pawsey, the readasarray problem is local to aceecostats, some mix of gdal lying around with my source builds #wget https://github.com/mdsumner/fixoisst/raw/main/file.nc
library(reticulate)
gdal <- import("osgeo.gdal")
gdal$UseExceptions()
ds <- gdal$OpenEx("file.nc", gdal$OF_MULTIDIM_RASTER)
rg <- ds$GetRootGroup()
sst <- rg$OpenMDArray("sst")
gdal$VersionInfo()
#[1] "3090000"
bd <- gdal$ExtendedDataType$Create(gdal$GDT_UInt16)
d <- sst$ReadAsArray(buffer_datatype = bd)
[1] "3100000"
> range(d)
[1] 0 3240 |
and perhaps more interesting gdal$SetConfigOption("AWS_NO_SIGN_REQUEST", "YES")
ds <- gdal$OpenEx("/vsis3/mur-sst/zarr", gdal$OF_MULTIDIM_RASTER)
(ds$GetRootGroup()$OpenMDArray("analysed_sst")$GetDimensions()[[1]])$GetSize()
[1] 6443
> (ds$GetRootGroup()$OpenMDArray("analysed_sst")$GetDimensions()[[2]])$GetSize()
[1] 17999
> (ds$GetRootGroup()$OpenMDArray("analysed_sst")$GetDimensions()[[3]])$GetSize()
[1] 36000
bd <- gdal$ExtendedDataType$Create(gdal$GDT_UInt16)
sst <- ds$GetRootGroup()$OpenMDArray("analysed_sst")
str(sst$ReadAsArray(buffer_datatype=bd, count = c(2L, 24L, 56L)))
int [1:2, 1:24, 1:56] 0 0 0 0 0 0 0 0 0 0 ...
|
Here we use the 'https://..nc#mode=bytes' method for xarray/netcdf4, then slice with reticulate idioms library(reticulate)
xarray <- import("xarray")
#ym: 198109
#ymd: 19810901
dates <- seq(as.Date("1981-09-01"), by = "1 day", length.out = 10)
ym <- format(dates, "%Y%m")
ymd <- format(dates, "%Y%m%d")
pat <- "https://projects.pawsey.org.au/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/%s/oisst-avhrr-v02r01.%s.nc#mode=bytes"
u <- sprintf(pat, ym, ymd)
## can't do this with chunks = list()
library(furrr)
plan(multisession)
arrds <- future_map(u, xarray$open_dataset, engine = "netcdf4", chunks = py_eval("{}"))
plan(sequential)
x <- xarray$concat(arrds, dim = "time")
#py_run_string("result = r.x.sel(time = slice('1981-09-01', '1981-09-04'))")
#py$result
## OR https://github.com/rstudio/reticulate/issues/122
py_slice <- import_builtins()$slice
x$sel(time = py_slice('1981-09-01', '1981-09-04'), lon = py_slice(130, 150))
|
here we use kerchunk to reference existing netcdf files as Zarr, it's a mix of python via reticulate and R, we get a July worth of OISST and convert to Zarr via kerchunk ## use bowerbird to get a month's worth of SST (no auth required)
library(bowerbird)
library(blueant)
datadir <- tempdir()
if (!file.exists(datadir)) dir.create(datadir)
cf <- bb_config(local_file_root = datadir)
## do it as a list because in other contexts we parallelize
filt <- 202407
srcslist <- purrr::map(filt, \(.x) {
blueant::sources("NOAA OI 1/4 Degree Daily SST AVHRR") |>
bb_modify_source(method = list(accept_follow = sprintf("avhrr/%i.*", .x), accept_download = sprintf(".*%i.*nc$", .x)))
})
cflist<- lapply(srcslist, \(.x) bb_add(cf, .x))
status <- furrr::future_map(cflist, \(.cf) bb_sync(.cf, verbose = TRUE, confirm_downloads_larger_than = -1))
files <- dplyr::bind_rows(lapply(status, \(.x) .x$files))
## now convert to kerchunk jsons, combine them to one virtual Zarr
pycode <-
'from kerchunk.hdf import SingleHdf5ToZarr
import ujson
import fsspec
def generate_kerchunk(u):
so = dict(
mode="rb", anon=False, default_fill_cache=False,
default_cache_type="none"
)
## do not yet know how important fsspec is, it hides stuff about remote storage
## or how to open just the URL
with fsspec.open(u, **so) as inf:
h5chunks = SingleHdf5ToZarr(inf, u, inline_threshold = 0)
return ujson.dumps(h5chunks.translate())
'
library(reticulate)
py <- py_run_string(pycode)
## generate all the jsons
jsondir <- file.path(tempdir(check = TRUE), "jsons")
dir.create(jsondir)
fs::file_delete(fs::dir_ls(jsondir))
for (i in seq_along(files$file)) {
u <- files$file[i]
writeLines(py$generate_kerchunk(u), file.path(jsondir, sprintf("%s.json", basename(u))))
}
jsons <- fs::dir_ls(jsondir)
kerchunk.combine <- import("kerchunk.combine")
mzz <- kerchunk.combine$MultiZarrToZarr(
jsons,
remote_protocol = py_none(),
remote_options = py_none(),
concat_dims='time',
inline_threshold=0
)
mzz$translate("combined.json")
xarray$open_dataset("combined.json")
I don't yet know how or if we can use NetCDF to open that json. |
does seem to scale pretty well, here using a few thousand from the full collection <xarray.Dataset> Size: 76GB
Dimensions: (time: 2278, zlev: 1, lat: 720, lon: 1440)
Coordinates:
* lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
* lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
* time (time) datetime64[ns] 18kB 1981-09-01T12:00:00 ... 1987-11-26T12...
* zlev (zlev) float32 4B 0.0
Data variables:
anom (time, zlev, lat, lon) float64 19GB dask.array<chunksize=(1, 1, 720, 1440), meta=np.ndarray>
err (time, zlev, lat, lon) float64 19GB dask.array<chunksize=(1, 1, 720, 1440), meta=np.ndarray>
ice (time, zlev, lat, lon) float64 19GB dask.array<chunksize=(1, 1, 720, 1440), meta=np.ndarray>
sst (time, zlev, lat, lon) float64 19GB dask.array<chunksize=(1, 1, 720, 1440), meta=np.ndarray>
Attributes: (12/37)
Conventions: CF-1.6, ACDD-1.3
cdm_data_type: Grid
comment: Data was converted from NetCDF-3 to NetCDF-4 ...
creator_email: oisst-help@noaa.gov
creator_url: https://www.ncei.noaa.gov/
date_created: 2020-05-08T19:05:13Z
... ...
source: ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...
standard_name_vocabulary: CF Standard Name Table (v40, 25 January 2017)
summary: NOAAs 1/4-degree Daily Optimum Interpolation ...
time_coverage_end: 1981-09-01T23:59:59Z
time_coverage_start: 1981-09-01T00:00:00Z
title: NOAA/NCEI 1/4 Degree Daily Optimum Interpolat... |
yep, it fully does the Dask thing on its own, I did this, when you get $values it invokes the computation across cores. ## we need the python slice() in R so we can use it in sel()
py_slice <- import_builtins()$slice
ds <- xarray$open_dataset("combined.json", chunks = dict())
## pick one year for one variable, and group it by month
y2 <- ds$sst$sel(#time = py_slice("2000-01-01", "2000-12-31"),
lon = py_slice(140, 160),
lat = py_slice(-50, -30))$
groupby("time.month")$mean("time")
## restore our georeferencing
resx <- function(x) diff(x[1:2])
pyvals <- function(x) x$values
restore_lim <- function(...) {
res <- lapply(list(...), resx)
lim <- lapply(list(...), range)
ex <- vector("list", length(res))
for( i in seq_along(res)) {
ex[[i]] <- lim[[i]] + c(-1, 1) * .5 * res[[i]]
}
unlist(ex)
}
ex <- restore_lim(lon, lat)
v <- y2$values
ximage::ximage(v[1,1,dim(v)[3]:1,], extent = ex, asp = 1)
maps::map(add = T) I think for NetCDF lib itself we need the thing written out to Zarr in that Parquet form. |
the new kerchunk is VirtualiZarr, this should work but I can't get it to yet from virtualizarr import open_virtual_dataset
import glob
import xarray
nc = glob.glob("bowerbird/**/*.nc", recursive = True)
virtual_datasets = [
open_virtual_dataset(filepath, indexes = {})
for filepath in nc
]
virtual_ds = xarray.combine_nested(virtual_datasets, concat_dim=['time'])
# cache the combined dataset pattern to disk, in this case using the existing kerchunk specification for reference files
virtual_ds.virtualize.to_kerchunk('virt_oisst.json', format='json') looks like related to this but IDK yet zarr-developers/VirtualiZarr#114
|
I had to do from virtualizarr import open_virtual_dataset
import glob
import xarray
nc = glob.glob("bowerbird/**/*.nc", recursive = True)
nc.sort()
vd = [
open_virtual_dataset(filepath, indexes = {})
for filepath in nc
]
vds = xarray.concat(vd, dim = 'time', coords = 'minimal', compat = 'override')
vds.virtualize.to_kerchunk('virt_oisst.json', format='json') |
there's a problem for VirtualiZarr here because at this transition the time changed from Float64 to Float32
when I open them separately virtually it's ok
but with concat, that Float32 bites, here's an example (also open_mfdataset doesn't detect and fix, open_dataset just sets it to Float64 always)
|
fix is "loaded variables", which is the same as inlining for kerchunk. I parallelize with dask as well, so the whole thing took about 60s import dask.multiprocessing
dask.config.set(scheduler='processes', num_workers = 128)
from virtualizarr import open_virtual_dataset
import glob
import xarray
nc = glob.glob("bowerbird/**/*.nc", recursive = True)
nc.sort()
def open_virtual(filepath):
return open_virtual_dataset(filepath, indexes = {},
loadable_variables=['lon', 'lat', 'time', 'zlev'],
cftime_variables=['time'])
vdd = [
dask.delayed(open_virtual)(filepath)
for filepath in nc
]
vd = dask.compute(vdd)
vds = xarray.concat(vd[0], dim = 'time', coords = 'minimal', compat = 'override')
vds.virtualize.to_kerchunk('virt_oisst.json', format='json')
xarray.open_dataset("virt_oisst.json")
<xarray.Dataset> Size: 520GB
Dimensions: (time: 15680, zlev: 1, lat: 720, lon: 1440)
Coordinates:
* lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
* lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
* time (time) datetime64[ns] 125kB 1981-09-01T12:00:00 ... 2024-08-05T1...
* zlev (zlev) float32 4B nan
Data variables:
anom (time, zlev, lat, lon) float64 130GB ...
err (time, zlev, lat, lon) float64 130GB ...
ice (time, zlev, lat, lon) float64 130GB ...
sst (time, zlev, lat, lon) float64 130GB ...
Attributes: (12/37)
Conventions: CF-1.6, ACDD-1.3
cdm_data_type: Grid
comment: Data was converted from NetCDF-3 to NetCDF-4 ...
creator_email: oisst-help@noaa.gov
creator_url: https://www.ncei.noaa.gov/
date_created: 2020-05-08T19:05:13Z
... ...
source: ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...
standard_name_vocabulary: CF Standard Name Table (v40, 25 January 2017)
summary: NOAAs 1/4-degree Daily Optimum Interpolation ...
time_coverage_end: 1981-09-01T23:59:59Z
time_coverage_start: 1981-09-01T00:00:00Z
title: NOAA/NCEI 1/4 Degree Daily Optimum Interpolat... aand, we can then use kerchunk (or VirtualiZarr ... WIP) to express that as a sharded parquet index: from kerchunk.df import refs_to_dataframe
refs_to_dataframe("virt_oisst.json", "virt_oisst.zarr") ## this is Parquet, but see VirtualiZarr has options in virtualizarr.xarray.VirtualiZarrDatasetAccessor arrow::open_dataset("virt_oisst.zarr")
FileSystemDataset with 8 Parquet files
4 columns
path: string
offset: int64 not null
size: int64 not null
raw: binary
See $metadata for additional Schema metadata
> arrow::open_dataset("virt_oisst.zarr") |> dplyr::collect()
# A tibble: 800,000 × 4
path offset size raw
<chr> <int> <int> <arr>
1 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 684063 NULL
2 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 686733 NULL
3 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 685793 NULL
4 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 685412 NULL
5 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 684732 NULL
6 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 684598 NULL
7 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 683830 NULL
8 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 683567 NULL
9 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 683112 NULL
10 bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature… 47747 682749 NULL
# ℹ 799,990 more rows
# ℹ Use `print(n = ...)` to see more rows
That doesn't record the hierarchy which is weird, but I think it can do it (something like |
compare to this which is 2 months stale |
reading that json, see that the loaded vars are stored encoded
and see how the reference offsets and lengths are variable js$refs$"ice/0.0.0.0"
[1] "bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198109/oisst-avhrr-v02r01.19810901.nc"
[2] "954024"
[3] "70081"
> js$refs$"ice/15679.0.0.0"
[1] "bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/202408/oisst-avhrr-v02r01.20240805_preliminary.nc"
[2] "1495218"
[3] "60237"
> js$refs$"ice/15678.0.0.0"
[1] "bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/202408/oisst-avhrr-v02r01.20240804_preliminary.nc"
[2] "1490331"
[3] "60462"
> js$refs$"sst/0.0.0.0"
[1] "bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198109/oisst-avhrr-v02r01.19810901.nc"
[2] "1025881"
[3] "683590"
> js$refs$"sst/15679.0.0.0"
[1] "bowerbird/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/202408/oisst-avhrr-v02r01.20240805_preliminary.nc"
[2] "47599"
[3] "662928"
|
and trying the obvious "transfer to another system" by subbing the source paths l <- readLines("virt_oisst.json") ## on pawsey
## now on aceecostats
l2 <- gsub("bowerbird\\\\/", "/rdsi/PUBLIC/raad/data/", l)
> writeLines(l2, "local.json")
> library(reticulate)
>
> xarray <- import("xarray")
> xarray$open_dataset("local.json")
<xarray.Dataset> Size: 520GB
Dimensions: (time: 15680, zlev: 1, lat: 720, lon: 1440)
Coordinates:
* lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
* lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
* time (time) datetime64[ns] 125kB 1981-09-01T12:00:00 ... 2024-08-05T1...
* zlev (zlev) float32 4B nan not bad for an easy hack v <- ds$sel(time = py_slice('2024-08-05', '2024-08-05'), lon = py_slice(130, 150), lat = py_slice(-60, -30))$sst$values
[1] 1 1 120 80 |
here is a distillation of how the core of kerchunk works to get the references, just because I feel like this could be handy for refreshing or checking or appending to a serialized dataset f = "somenetcdffile.nc"
import h5py
def get_key(blob):
return tuple([a // b for a, b in zip(blob.chunk_offset, chunk_size)])
def store_chunk_info(blob):
stinfo[get_key(blob)] = {"offset": blob.byte_offset, "size": blob.size}
d = h5py.File(f)
dset = d["sst"]
dsid = dset.id
dset.chunks
#(1, 1, 720, 1440)
stinfo = dict()
chunk_size = dset.chunks
stinfo = dict()
dsid.chunk_iter(store_chunk_info)
stinfo
{(0, 0, 0, 0): {'offset': 1011319, 'size': 674027}}
|
I had a go at rhdf5 but omg what a hard thing to use ... no idea |
A direct comparison for terra and xarray library(reticulate)
xarray <- import("xarray")
ds <- xarray$open_dataset("virt_oisst.json", chunks = reticulate::dict())
system.time(v <- ds$sel(lon = 147, lat = -48, method = "nearest")$sst$values)
# user system elapsed
#142.690 65.409 42.750
library(furrr)
library(terra)
library(fs)
f <- dir_ls("bowerbird/www.ncei.noaa.gov", recurse = T, regexp = ".*\\.nc$")
fun <- function(x, cell) extract(rast(x, "sst"), cell, raw = TRUE)
cell <- cellFromXY(rast(f[1]), cbind(147, -48))
plan(multicore)
system.time({
v2 <- future_map_dbl(f, fun, cell = cell)
})
# user system elapsed
#308.310 159.028 6.434
plan(sequential)
|
this is much slower xfun <- function(x, pt) xarray$open_dataset(x, chunks = dict())$sst$sel(lon = pt[1], lat = pt[2], method = "nearest")$values
library(furrr)
plan(multicore)
system.time(v3 <- future_map(f, xfun, pt = cbind(147, -48)))
plan(sequential) |
trying to open in parallel segfaults f = glob.glob("bowerbird/www.ncei.noaa.gov/**/*.nc", recursive = True)
ds = xarray.open_mfdataset(f, parallel = True, chunks = {}) |
prevent segfault with |
why can't xarray do this? setwd("/scratch/pawsey0973/mdsumner")
f <- fs::dir_ls("bowerbird/www.ncei.noaa.gov", recurse = TRUE, regexp = ".*nc$")
library(reticulate)
xarray <- import("xarray")
system.time(ds <- xarray$open_mfdataset(f, parallel = TRUE, engine = "h5netcdf", chunks = dict()))
/usr/local/lib/python3.10/dist-packages/pyproj/__init__.py:89: UserWarning: pyproj unable to set database path.
_pyproj_global_context_initialize()
Error in py_call_impl(callable, call_args$unnamed, call_args$named) :
ValueError: Not a file or file object (not a file or file object)
Run `reticulate::py_last_error()` for details.
Timing stopped at: 23.45 11.02 22.35
This takes seconds library(furrr)
plan(multicore)
l <- future_map(f, \(.x) terra::extract(terra::rast(.x, "sst"), 1e4, raw = TRUE))
range(unlist(l))
#[1] -1.80 0.07 |
trying to write an actual zarr from virtual import xarray
ds = xarray.open_dataset("virt_oisst.json", chunks = {})
ds.time.attrs = {}
ds = ds.sst.chunk({'time': 240, 'zlev': 1, 'lat': 60, 'lon': 120})
nds = ds.to_zarr("virt_chunks_oisst.zarr", mode = "w")
nds = ds.to_zarr("virt_chunks_oisst.zarr", mode = "w")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/core/dataarray.py", line 4352, in to_zarr
return to_zarr( # type: ignore[call-overload,misc]
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/backends/api.py", line 1763, in to_zarr
dump_to_store(dataset, zstore, writer, encoding=encoding)
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/backends/api.py", line 1450, in dump_to_store
store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/backends/zarr.py", line 720, in store
self.set_variables(
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/backends/zarr.py", line 766, in set_variables
encoding = extract_zarr_variable_encoding(
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/backends/zarr.py", line 284, in extract_zarr_variable_encoding
chunks = _determine_zarr_chunks(
File "/home/mdsumner/.local/lib/python3.10/site-packages/xarray/backends/zarr.py", line 198, in _determine_zarr_chunks
raise ValueError(
ValueError: Specified zarr chunks encoding['chunks']=(1, 1, 720, 1440) for variable named 'sst' would overlap multiple dask chunks ((240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 80), (1,), (60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60), (120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120)). Writing this array in parallel with dask could lead to corrupted data. Consider either rechunking using `chunk()`, deleting or modifying `encoding['chunks']`, or specify `safe_chunks=False`.
## none of the other ways can work, must be a virtualizarr detail
nds = ds.to_zarr("virt_chunks_oisst.zarr", mode = "w", safe_chunks = False)
#ds = xarray.open_dataset("virt_chunks_oisst.zarr", chunks = {})
#ds.sel(lon = 147, lat = -48, method = "nearest").sst.values
|
that seems insanely slow, and does sel() isel() have to open every chunk into memory to read one pix ?? because at that first level, each file is a chunk |
rechunker, something like this import xarray
from rechunker import rechunk
ds = xarray.open_dataset("virt_chunks_oisst.zarr", chunks = {})
ds.time.attrs = {}
r = rechunk(ds, target_chunks = {'time' : 120, 'zlev' : 1, 'lat' : 60, 'lon' : 120}, target_store = "oisst.zarr", temp_store = "intermediate2.zarr", max_mem = 5000000)
r.execute() |
check out this example, submitted as PR to VirtualiZarr, running files into a referenced xarray via Coiled (reduce done on local machine) |
using lessons from that PR import numpy as np
import xarray as xr
import dask
from virtualizarr import open_virtual_dataset
from virtualizarr.backend import FileType
@dask.delayed
def process(filename):
vds = open_virtual_dataset(
filename,
decode_times=True,
loadable_variables=["time", "zlev", "lat", "lon"],
filetype=FileType("netcdf4"),
indexes={},
)
return vds
days = [x + 1 for x in range(30)]
urls = [f"https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198109/oisst-avhrr-v02r01.198109{day:02}.nc" for day in days]
## now we push out over our cluster (not using coiled)
results = dask.compute([process(xnc) for xnc in urls])
vds = xr.concat(results[0], dim = 'time', coords = 'minimal', compat = 'override')
vds.virtualize.to_kerchunk("oisst.parquet", format="parquet")
ds = xr.open_dataset("oisst.parquet")
ds.isel(time = 0).sst.sel(lon = 147, lat = -48, method = "nearest").values
#array([9.41999979]) |
just me slowly ramping up to crew Sys.setenv("AWS_NO_SIGN_REQUEST" = "YES")
Sys.setenv("AWS_S3_ENDPOINT" = "projects.pawsey.org.au")
library(gdalraster)
root <- "/vsis3/www.ncei.noaa.gov/"
files <- tibble::tibble(fullname = sprintf("%s%s", root, vsi_read_dir(root, recursive = TRUE)))
files <- dplyr::mutate(files, date = as.POSIXct(as.Date(stringr::str_extract(basename(fullname),
"[0-9]{8}"), "%Y%m%d"), tz = "UTC"))
## this will drop preliminary files in favour of same-date final versions
files <- dplyr::arrange(dplyr::distinct(files, date, .keep_all = TRUE),date) |> dplyr::select(date, fullname)
extra <- function(x) terra::extract(terra::rast(x, "sst"), 795469)
library(furrr)
library(terra)
n <- 128
#system.time(future_map(sample(files$fullname, n), extra))
## about 6 seconds (because multiple tasks in one call)
plan(multicore)
system.time(future_map(sample(files$fullname, n), extra))
plan(sequential)
library(crew)
controller <- crew_controller_local(
name = "example",
workers = parallelly::availableCores(),
seconds_idle = 10
)
## a bit more than 10 seconds (because we pushed a task for each, and all input is copied every time)
system.time({
controller$start()
for (i in 1:128) {
controller$push(name = sprintf("extra%i", i), command = extra(files$fullname[i]), globals = list(extra = extra, files = files, i = i))
}
controller$wait(mode = "all")
})
|
ok yeh boi Sys.setenv("AWS_NO_SIGN_REQUEST" = "YES")
Sys.setenv("AWS_S3_ENDPOINT" = "projects.pawsey.org.au")
library(gdalraster)
root <- "/vsis3/www.ncei.noaa.gov/"
files <- tibble::tibble(fullname = sprintf("%s%s", root, vsi_read_dir(root, recursive = TRUE)))
files <- dplyr::mutate(files, date = as.POSIXct(as.Date(stringr::str_extract(basename(fullname),
"[0-9]{8}"), "%Y%m%d"), tz = "UTC"))
## this will drop preliminary files in favour of same-date final versions
files <- dplyr::arrange(dplyr::distinct(files, date, .keep_all = TRUE),date) |> dplyr::select(date, fullname)
extra <- function(x) terra::extract(terra::rast(x, "sst"), 795469)
library(furrr)
library(terra)
library(crew)
controller <- crew_controller_local(
name = "example",
workers = parallelly::availableCores(),
seconds_idle = 10
)
system.time({
results <- controller$map(
command = extra(file),
iterate = list(
file = files$fullname
),
globals = list(extra = extra)
)
})
15664 tasks in 1m 43.6s
|
just a few rasterio tricks, how to get XML out of it library(reticulate)
rioxarray <- import("rioxarray")
rasterio <- import("rasterio")
affine <- import("affine")
dsn <- "<GDAL_WMS><Service name=\"VirtualEarth\"><ServerUrl>http://a${server_num}.ortho.tiles.virtualearth.net/tiles/a${quadkey}.jpeg?g=90</ServerUrl></Service><MaxConnections>4</MaxConnections><Cache/></GDAL_WMS>"
crs <- "EPSG:4326"
transform <- vaster::extent_dim_to_gt(c(100, 120, -40, -30), c(1024, 512))
aff <- affine$Affine$from_gdal(transform[1], transform[2], transform[3], transform[4], transform[5], transform[6])
affine$Affine$
ds <- rasterio$open(dsn)
vrt <- rasterio$vrt$WarpedVRT(ds, height = 512, width = 1024, crs = crs, transform = aff)
xa <- rioxarray$open_rasterio(vrt)
v <- xa$values
rasterio.shutil <- import("rasterio.shutil")
rasterio.io <- import("rasterio.io")
url <- "/vsicurl/https://storage.googleapis.com/gcp-public-data-landsat/LC08/01/047/027/LC08_L1TP_047027_20130421_20170310_01_T1/LC08_L1TP_047027_20130421_20170310_01_T1_B4.TIF"
src <- rasterio$open(url)
vrt <- rasterio$vrt$WarpedVRT(src, crs = "EPSG:4326", width = 128, height = 128)
mem <- rasterio.io$MemoryFile()
rasterio.shutil$copy(vrt, mem$name, driver = "VRT")
vrt_xml <- mem$read()$decode('utf-8')
vrt_xml = mem.read().decode('utf-8')
## we could open 'vrt' itself, but we can also put this xml somewhere and then open as xarray DataArray
rds <- rioxarray$open_rasterio(vrt_xml)
|
So, clone all of OISST
Virtualize (note this is usage different from above since recent weeks)
|
drop zlev makes! it! work! from virtualizarr import open_virtual_dataset
aws_credentials = {"endpoint_url": "https://projects.pawsey.org.au", "key": key, "secret": secret}
u = 's3://idea-10.7289-v5sq8xb5/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198509/oisst-avhrr-v02r01.19850902.nc'
vds = open_virtual_dataset(u, reader_options={'storage_options': aws_credentials}, loadable_variables = ['time', 'lon', 'lat'], decode_times = True, indexes = {}, drop_variables = "zlev")
vds.virtualize.to_kerchunk('combined.json', format='json')
xarray.open_dataset("combined.json") |
here's the VirtualiZarr concat process ## still needing key/secret in python, need to explore what packages expect
aws_credentials = {"endpoint_url": "https://projects.pawsey.org.au", "key": key, "secret": secret}
import dask.multiprocessing
dask.config.set(scheduler='processes', num_workers = 24)
from virtualizarr import open_virtual_dataset
## load in objects from store/bucket, fstring out the s3://bucket/key, open all up and write to json, push that to vzarr/ bucket
import pyarrow.parquet as pq
import pyarrow as pa
from pyarrow import fs
s3 = fs.S3FileSystem(endpoint_override = "projects.pawsey.org.au")
s3table = pq.read_table("idea-objects/idea-objects.parquet", filesystem = s3, filters=[('Bucket','==', 'idea-10.7289-v5sq8xb5')])
nclist = s3table.to_pandas().Key.tolist()
nc = [f's3://idea-10.7289-v5sq8xb5/{nc0}' for nc0 in nclist]
nc.sort()
def open_virtual(filepath, creds):
return open_virtual_dataset(filepath, indexes = {},
loadable_variables=['lon', 'lat', 'time'], drop_variables = ['zlev'],
decode_times = True, reader_options={'storage_options': creds})
vdd = [
dask.delayed(open_virtual)(filepath, aws_credentials)
for filepath in nc
]
vd = dask.compute(vdd)
import xarray
vds = xarray.concat(vd[0], dim = 'time', coords = 'minimal', compat = 'override')
vds.virtualize.to_kerchunk('virt_oisst.json', format='json')
xarray.open_dataset("virt_oisst.json")
<xarray.Dataset> Size: 532GB
Dimensions: (time: 16040, zlev: 1, lat: 720, lon: 1440)
Coordinates:
* lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
* lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
* time (time) datetime64[ns] 128kB 1981-09-01T12:00:00 ... 2024-10-25T1...
Dimensions without coordinates: zlev
Data variables:
anom (time, zlev, lat, lon) float64 133GB ...
err (time, zlev, lat, lon) float64 133GB ...
ice (time, zlev, lat, lon) float64 133GB ...
sst (time, zlev, lat, lon) float64 133GB ...
... getting (one time) error from each thread, but rasterio should be involved at all, maybe it's when it tries to detect the engine?
|
"sst/16038.0.0.0" failed to fetch target ['s3://idea-10.7289-v5sq8xb5/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/202410/oisst-avhrr-v02r01.20241024_preliminary.nc', 58900, 652770] |
## this should work on remote systems now
aws_credentials = {"endpoint_url": "https://projects.pawsey.org.au", "anon": True}
import dask.multiprocessing
dask.config.set(scheduler='processes', num_workers = 24)
from virtualizarr import open_virtual_dataset
## load in objects from store/bucket, fstring out the s3://bucket/key, open all up and write to json, push that to vzarr/ bucket
import pyarrow.parquet as pq
import pyarrow as pa
from pyarrow import fs
s3 = fs.S3FileSystem(endpoint_override = "projects.pawsey.org.au")
s3table = pq.read_table("idea-objects/idea-objects.parquet", filesystem = s3, filters=[('Bucket','==', 'idea-10.7289-v5sq8xb5')])
nclist = s3table.to_pandas().Key.tolist()
nc = [f's3://idea-10.7289-v5sq8xb5/{nc0}' for nc0 in nclist]
nc.sort()
def open_virtual(filepath, creds):
return open_virtual_dataset(filepath, indexes = {},
loadable_variables=['lon', 'lat', 'time'], drop_variables = ['zlev'],
decode_times = True, reader_options={'storage_options': creds})
vdd = [
dask.delayed(open_virtual)(filepath, aws_credentials)
for filepath in nc
]
vd = dask.compute(vdd)
import xarray
vds = xarray.concat(vd[0], dim = 'time', coords = 'minimal', compat = 'override')
vds.virtualize.to_kerchunk('virt_oisst.json', format='json')
xarray.open_dataset("virt_oisst.json", engine="kerchunk", storage_options={"remote_options": aws_credentials}) |
so ds = xarray.open_dataset("virt_oisst.json", engine="kerchunk", storage_options={"remote_options": aws_credentials})
ds.sel(time = "2022-01-01").sst.mean()
<xarray.DataArray 'sst' ()> Size: 8B
array(13.76249903) library(raadtools)
s <- readsst("2022-01-01")
mean(values(s), na.rm = T)
[1] 13.7625 |
this works aws_credentials = {"endpoint_url": "https://projects.pawsey.org.au", "anon": True}
import xarray
ds = xarray.open_dataset("https://projects.pawsey.org.au/vzarr/virt_oisst.json", engine = "kerchunk", chunks = {},
storage_options = {"remote_options": aws_credentials})
|
and so import xarray
so = {"endpoint_url": "https://projects.pawsey.org.au", "anon": True}
import xarray
ds = xarray.open_dataset("s3://vzarr/oisst.parquet", engine = "kerchunk", chunks = {},
storage_options={"target_options": so, "remote_options": so}) |
here's the basics of how we could return an xarray object (sometimes you'll want the date from the fileset, sometimes it will be in the object already)
Don't yet know how to specify what the time is (interval or start or middle or whatever)
I think this could avoid the actual calls to open datasets by using kerchunk ...
The text was updated successfully, but these errors were encountered: