From d594dd1bb0faec2158f3d9a129a4d59808f149c2 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Fri, 22 Dec 2023 15:33:09 +0000 Subject: [PATCH 1/5] update --- climetlab/sources/era5_accumulations.py | 113 ++++++++++++++++-------- climetlab/sources/oper_accumulations.py | 6 +- 2 files changed, 80 insertions(+), 39 deletions(-) diff --git a/climetlab/sources/era5_accumulations.py b/climetlab/sources/era5_accumulations.py index ae851cc1..d7194a64 100644 --- a/climetlab/sources/era5_accumulations.py +++ b/climetlab/sources/era5_accumulations.py @@ -13,34 +13,9 @@ import climetlab as cml from climetlab import Source +from climetlab.core.temporary import temp_file from climetlab.decorators import normalize - -mapping = [ - [-1, 18, 6], - [-1, 18, 7], - [-1, 18, 8], - [-1, 18, 9], - [-1, 18, 10], - [-1, 18, 11], - [-1, 18, 12], - [0, 6, 1], - [0, 6, 2], - [0, 6, 3], - [0, 6, 4], - [0, 6, 5], - [0, 6, 6], - [0, 6, 7], - [0, 6, 8], - [0, 6, 9], - [0, 6, 10], - [0, 6, 11], - [0, 6, 12], - [0, 18, 1], - [0, 18, 2], - [0, 18, 3], - [0, 18, 4], - [0, 18, 5], -] +from climetlab.readers.grib.output import new_grib_output class Era5Accumulations(Source): @@ -67,6 +42,7 @@ def __init__(self, *args, **kwargs): for p in param: assert p in ["cp", "lsp", "tp"], p + user_step = 6 # For now, we only support 6h accumulation user_dates = request["date"] user_times = request["time"] @@ -85,17 +61,23 @@ def __init__(self, *args, **kwargs): assert isinstance(user_time, int), (type(user_time), user_dates, user_times) assert 0 <= user_time <= 24, user_time - date = user_date + datetime.timedelta(hours=user_time) - delta, time, step = mapping[date.hour] + requested.add(user_date + datetime.timedelta(hours=user_time)) + + when = ( + user_date + + datetime.timedelta(hours=user_time) + - datetime.timedelta(hours=user_step) + ) + add_step = 0 - assert 0 <= time <= 23, time - assert 0 <= step <= 24, step + while when.hour not in (6, 18): + when -= datetime.timedelta(hours=1) + add_step += 1 - when = date + datetime.timedelta(days=delta) dates.add(datetime.datetime(when.year, when.month, when.day)) - times.add(time) - steps.add(step) - requested.add(date) + times.add(when.hour) + for step in range(1, user_step + 1): + steps.add(step + add_step) valids = defaultdict(list) for date, time, step in itertools.product(dates, times, steps): @@ -106,7 +88,7 @@ def __init__(self, *args, **kwargs): got = set(valids.keys()) assert all(len(x) == 1 for x in valids.values()) missing = requested - got - assert len(missing) == 0 + assert len(missing) == 0, missing # extra = got - requested @@ -128,8 +110,67 @@ def __init__(self, *args, **kwargs): ) ds = cml.load_source("mars", **era_request) + + ds = ds.order_by("param", "date", "time", "step") + last_key = None + fields = [] + + tmp = temp_file() + path = tmp.path + out = new_grib_output(path) + + def flush(): + nonlocal last_key, fields + if last_key is None: + return + lastStep = None + values = None + startSteps = [] + endSteps = [] + for field in fields: + startStep = field.metadata("startStep") + endStep = field.metadata("endStep") + startSteps.append(startStep) + endSteps.append(endStep) + if lastStep is not None: + assert startStep == lastStep + assert endStep - startStep == 1, (startStep, endStep) + lastStep = endStep + if values is None: + values = field.values + else: + values += field.values + + out.write( + values, + template=fields[0], + startStep=min(startSteps), + endStep=max(endSteps), + ) + + fields = [] + + for field in ds: + key = ( + field.metadata("param"), + field.metadata("date"), + field.metadata("time"), + ) + step = field.metadata("step") + if key != last_key: + flush() + last_key = key + + fields.append(field) + + flush() + out.close() + + ds = cml.load_source("file", path) + index = [d.valid_datetime() in requested for d in ds] self.ds = ds[index] + self.ds._tmp = tmp def mutate(self): return self.ds diff --git a/climetlab/sources/oper_accumulations.py b/climetlab/sources/oper_accumulations.py index 2a1b0021..6f06048a 100644 --- a/climetlab/sources/oper_accumulations.py +++ b/climetlab/sources/oper_accumulations.py @@ -38,7 +38,7 @@ def __init__(self, *args, **kwargs): "scda": {"dates": set(), "times": set()}, } - step = 6 + user_step = 6 requested = set() for user_date, user_time in itertools.product(user_dates, user_times): @@ -54,7 +54,7 @@ def __init__(self, *args, **kwargs): requested.add(when) - when -= datetime.timedelta(hours=step) + when -= datetime.timedelta(hours=user_step) date = datetime.datetime(when.year, when.month, when.day) time = when.hour @@ -84,7 +84,7 @@ def __init__(self, *args, **kwargs): "stream": stream, "date": [d.strftime("%Y-%m-%d") for d in dates], "time": sorted(times), - "step": step, + "step": user_step, } ) From 32d17b981a8209b0e81616c99aca02ea40320009 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Fri, 22 Dec 2023 18:33:15 +0000 Subject: [PATCH 2/5] update --- climetlab/sources/era5_accumulations.py | 185 +++++++++++++----------- 1 file changed, 102 insertions(+), 83 deletions(-) diff --git a/climetlab/sources/era5_accumulations.py b/climetlab/sources/era5_accumulations.py index d7194a64..150a287f 100644 --- a/climetlab/sources/era5_accumulations.py +++ b/climetlab/sources/era5_accumulations.py @@ -7,15 +7,76 @@ # nor does it submit to any jurisdiction. # +from collections import defaultdict import datetime import itertools -from collections import defaultdict import climetlab as cml from climetlab import Source from climetlab.core.temporary import temp_file from climetlab.decorators import normalize from climetlab.readers.grib.output import new_grib_output +from climetlab.utils.availability import Availability + + +class Accumulation: + def __init__(self, out, param, date, time, step): + self.out = out + self.param = param + self.date = date + self.time = time * 100 + self.steps = tuple(step) + self.values = None + self.seen = set() + self.startStep = None + self.endStep = None + self.done = False + + @property + def key(self): + return (self.param, self.date, self.time, self.steps) + + def add(self, field): + step = field.metadata("step") + if step not in self.steps: + return + + assert not self.done, (self.key, step) + + startStep = field.metadata("startStep") + endStep = field.metadata("endStep") + + assert endStep == step, (startStep, endStep, step) + assert step not in self.seen, (self.key, step) + + assert endStep - startStep == 1, (startStep, endStep) + + if self.startStep is None: + self.startStep = startStep + else: + self.startStep = min(self.startStep, startStep) + + if self.endStep is None: + self.endStep = endStep + else: + self.endStep = max(self.endStep, endStep) + + if self.values is None: + self.values = field.values + else: + self.values += field.values + + self.seen.add(step) + + if len(self.seen) == len(self.steps): + self.out.write( + self.values, + template=field, + startStep=self.startStep, + endStep=self.endStep, + ) + self.values = None + self.done = True class Era5Accumulations(Source): @@ -48,9 +109,19 @@ def __init__(self, *args, **kwargs): requested = set() - dates = set() - times = set() - steps = set() + era_request = dict(**request) + + type_ = request.get("type", "an") + if type_ == "an": + type_ = "fc" + + era_request.update({"class": "ea", "type": type_, "levtype": "sfc"}) + + tmp = temp_file() + path = tmp.path + out = new_grib_output(path) + + requests = [] for user_date, user_time in itertools.product(user_dates, user_times): assert isinstance(user_date, datetime.datetime), ( @@ -74,102 +145,50 @@ def __init__(self, *args, **kwargs): when -= datetime.timedelta(hours=1) add_step += 1 - dates.add(datetime.datetime(when.year, when.month, when.day)) - times.add(when.hour) - for step in range(1, user_step + 1): - steps.add(step + add_step) - - valids = defaultdict(list) - for date, time, step in itertools.product(dates, times, steps): - valids[ - date + datetime.timedelta(hours=time) + datetime.timedelta(hours=step) - ].append((date, time, step)) + steps = tuple(step + add_step for step in range(1, user_step + 1)) - got = set(valids.keys()) - assert all(len(x) == 1 for x in valids.values()) - missing = requested - got - assert len(missing) == 0, missing + for p in param: + requests.append( + { + "param": p, + "date": int(when.strftime("%Y%m%d")), + "time": when.hour, + "step": sorted(steps), + } + ) - # extra = got - requested + compressed = Availability(requests) + ds = cml.load_source("empty") + for r in compressed.iterate(): + era_request.update(r) + ds = ds + cml.load_source("mars", **era_request) - era_request = dict(**request) - - type_ = request.get("type", "an") - if type_ == "an": - type_ = "fc" - - era_request.update( - { - "class": "ea", - "type": type_, - "levtype": "sfc", - "date": [d.strftime("%Y-%m-%d") for d in dates], - "time": sorted(times), - "step": sorted(steps), - } - ) - - ds = cml.load_source("mars", **era_request) - - ds = ds.order_by("param", "date", "time", "step") - last_key = None - fields = [] - - tmp = temp_file() - path = tmp.path - out = new_grib_output(path) - - def flush(): - nonlocal last_key, fields - if last_key is None: - return - lastStep = None - values = None - startSteps = [] - endSteps = [] - for field in fields: - startStep = field.metadata("startStep") - endStep = field.metadata("endStep") - startSteps.append(startStep) - endSteps.append(endStep) - if lastStep is not None: - assert startStep == lastStep - assert endStep - startStep == 1, (startStep, endStep) - lastStep = endStep - if values is None: - values = field.values - else: - values += field.values - - out.write( - values, - template=fields[0], - startStep=min(startSteps), - endStep=max(endSteps), - ) + accumulations = defaultdict(list) + for a in [Accumulation(out, **r) for r in requests]: + for s in a.steps: + accumulations[(a.param, a.date, a.time, s)].append(a) - fields = [] for field in ds: key = ( field.metadata("param"), field.metadata("date"), field.metadata("time"), + field.metadata("step"), ) - step = field.metadata("step") - if key != last_key: - flush() - last_key = key + for a in accumulations[key]: + a.add(field) - fields.append(field) + for acc in accumulations.values(): + for a in acc: + assert a.done, (a.key, a.seen, a.steps) - flush() out.close() ds = cml.load_source("file", path) - index = [d.valid_datetime() in requested for d in ds] - self.ds = ds[index] + self.ds = cml.load_source("file", path) + assert len(self.ds)/len(param) == len(requested), (len(self.ds), len(param), len(requested)) self.ds._tmp = tmp def mutate(self): From 589c1e09f6b659dc1b5b05c8ec4c265a461ca7b6 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Sat, 23 Dec 2023 08:49:04 +0000 Subject: [PATCH 3/5] update --- climetlab/core/index.py | 3 +++ climetlab/sources/era5_accumulations.py | 24 +++++++++++++++--------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/climetlab/core/index.py b/climetlab/core/index.py index 3251050a..1b57ffa6 100644 --- a/climetlab/core/index.py +++ b/climetlab/core/index.py @@ -145,6 +145,9 @@ def get(self, x): order[int(key)] = i except ValueError: pass + except TypeError: + print('Cannot convert "%s" to int (%s)' % (key, type(key))) + raise try: order[float(key)] = i except ValueError: diff --git a/climetlab/sources/era5_accumulations.py b/climetlab/sources/era5_accumulations.py index 150a287f..aa831ee8 100644 --- a/climetlab/sources/era5_accumulations.py +++ b/climetlab/sources/era5_accumulations.py @@ -7,9 +7,9 @@ # nor does it submit to any jurisdiction. # -from collections import defaultdict import datetime import itertools +from collections import defaultdict import climetlab as cml from climetlab import Source @@ -36,7 +36,7 @@ def __init__(self, out, param, date, time, step): def key(self): return (self.param, self.date, self.time, self.steps) - def add(self, field): + def add(self, field, values): step = field.metadata("step") if step not in self.steps: return @@ -62,9 +62,11 @@ def add(self, field): self.endStep = max(self.endStep, endStep) if self.values is None: - self.values = field.values - else: - self.values += field.values + import numpy as np + + self.values = np.zeros_like(values) + + self.values += values self.seen.add(step) @@ -164,11 +166,10 @@ def __init__(self, *args, **kwargs): ds = ds + cml.load_source("mars", **era_request) accumulations = defaultdict(list) - for a in [Accumulation(out, **r) for r in requests]: + for a in [Accumulation(out, **r) for r in requests]: for s in a.steps: accumulations[(a.param, a.date, a.time, s)].append(a) - for field in ds: key = ( field.metadata("param"), @@ -176,8 +177,9 @@ def __init__(self, *args, **kwargs): field.metadata("time"), field.metadata("step"), ) + values = field.values # optimisation for a in accumulations[key]: - a.add(field) + a.add(field, values) for acc in accumulations.values(): for a in acc: @@ -188,7 +190,11 @@ def __init__(self, *args, **kwargs): ds = cml.load_source("file", path) self.ds = cml.load_source("file", path) - assert len(self.ds)/len(param) == len(requested), (len(self.ds), len(param), len(requested)) + assert len(self.ds) / len(param) == len(requested), ( + len(self.ds), + len(param), + len(requested), + ) self.ds._tmp = tmp def mutate(self): From 57c57925f0867f4368b2dde3cf90bfd5402d2dc9 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Fri, 5 Jan 2024 10:29:36 +0000 Subject: [PATCH 4/5] support for enda precips --- climetlab/sources/era5_accumulations.py | 53 +++++++++++++++++-------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/climetlab/sources/era5_accumulations.py b/climetlab/sources/era5_accumulations.py index aa831ee8..bdb58079 100644 --- a/climetlab/sources/era5_accumulations.py +++ b/climetlab/sources/era5_accumulations.py @@ -20,21 +20,23 @@ class Accumulation: - def __init__(self, out, param, date, time, step): + def __init__(self, out, param, date, time, step, number, stepping): self.out = out self.param = param self.date = date self.time = time * 100 + self.number = number self.steps = tuple(step) self.values = None self.seen = set() self.startStep = None self.endStep = None self.done = False + self.stepping = stepping @property def key(self): - return (self.param, self.date, self.time, self.steps) + return (self.param, self.date, self.time, self.steps, self.number) def add(self, field, values): step = field.metadata("step") @@ -49,7 +51,7 @@ def add(self, field, values): assert endStep == step, (startStep, endStep, step) assert step not in self.seen, (self.key, step) - assert endStep - startStep == 1, (startStep, endStep) + assert endStep - startStep == self.stepping, (startStep, endStep) if self.startStep is None: self.startStep = startStep @@ -102,10 +104,15 @@ def __init__(self, *args, **kwargs): param = request["param"] if not isinstance(param, (list, tuple)): param = [param] + for p in param: assert p in ["cp", "lsp", "tp"], p + number = request.get("number", [0]) + assert isinstance(number, (list, tuple)) + user_step = 6 # For now, we only support 6h accumulation + user_dates = request["date"] user_times = request["time"] @@ -117,6 +124,12 @@ def __init__(self, *args, **kwargs): if type_ == "an": type_ = "fc" + stepping = 1 + if request.get("stream") == "enda": + stepping = 3 + for n in user_times: + assert n % 6 == 0, n + era_request.update({"class": "ea", "type": type_, "levtype": "sfc"}) tmp = temp_file() @@ -144,20 +157,25 @@ def __init__(self, *args, **kwargs): add_step = 0 while when.hour not in (6, 18): - when -= datetime.timedelta(hours=1) - add_step += 1 + when -= datetime.timedelta(hours=stepping) + add_step += stepping - steps = tuple(step + add_step for step in range(1, user_step + 1)) + steps = tuple( + step + add_step + for step in range(stepping, user_step + stepping, stepping) + ) for p in param: - requests.append( - { - "param": p, - "date": int(when.strftime("%Y%m%d")), - "time": when.hour, - "step": sorted(steps), - } - ) + for n in number: + requests.append( + { + "param": p, + "date": int(when.strftime("%Y%m%d")), + "time": when.hour, + "step": sorted(steps), + "number": n, + } + ) compressed = Availability(requests) ds = cml.load_source("empty") @@ -166,9 +184,9 @@ def __init__(self, *args, **kwargs): ds = ds + cml.load_source("mars", **era_request) accumulations = defaultdict(list) - for a in [Accumulation(out, **r) for r in requests]: + for a in [Accumulation(out, stepping=stepping, **r) for r in requests]: for s in a.steps: - accumulations[(a.param, a.date, a.time, s)].append(a) + accumulations[(a.param, a.date, a.time, s, a.number)].append(a) for field in ds: key = ( @@ -176,6 +194,7 @@ def __init__(self, *args, **kwargs): field.metadata("date"), field.metadata("time"), field.metadata("step"), + field.metadata("number"), ) values = field.values # optimisation for a in accumulations[key]: @@ -190,7 +209,7 @@ def __init__(self, *args, **kwargs): ds = cml.load_source("file", path) self.ds = cml.load_source("file", path) - assert len(self.ds) / len(param) == len(requested), ( + assert len(self.ds) / len(param) / len(number) == len(requested), ( len(self.ds), len(param), len(requested), From bb33e0c95b2769e77f3d89819e6886de2171276e Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Tue, 9 Jan 2024 19:38:29 +0000 Subject: [PATCH 5/5] Bump version 0.19.2 --- climetlab/version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/climetlab/version b/climetlab/version index 41915c79..61e6e92d 100644 --- a/climetlab/version +++ b/climetlab/version @@ -1 +1 @@ -0.19.1 +0.19.2