Skip to content

Commit

Permalink
Definitions for measurements api (#6)
Browse files Browse the repository at this point in the history
* add measurements protobuf definition

* inlude measurement api definitions into client

* fix error when running dataset api query directly on job queue

* example notebook for measurements

* linting and small cleanup

* remove unnecessary logging message

Co-authored-by: fredericschwarz <[email protected]>
  • Loading branch information
fred-sch and fredericschwarz authored Jul 28, 2021
1 parent df6b06d commit 9c3f3e8
Show file tree
Hide file tree
Showing 15 changed files with 1,172 additions and 43 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,7 @@ dmypy.json
.pyre/

#IDEs
.idea/
.idea/

#Apple
*.DS_Store
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def meteoblue_timeinterval_to_timestamps(t):
return list(map(map_ts, t.timestrings))

timerange = range(t.start, t.end, t.stride)
return list(map(lambda t: dt.date.fromtimestamp(t), timerange))
return list(map(lambda t: dt.datetime.fromtimestamp(t), timerange))

query = { ... }
result = client.query_sync(query)
Expand Down
5 changes: 3 additions & 2 deletions example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"source": [
"import logging\n",
"import meteoblue_dataset_sdk\n",
"import os\n",
"\n",
"# Display information about the current download state\n",
"logging.basicConfig(level=logging.INFO)\n",
Expand Down Expand Up @@ -66,8 +67,8 @@
" }\n",
" ],\n",
"}\n",
"client = meteoblue_dataset_sdk.Client(apikey=\"xxxxxx\")\n",
"result = client.querySync(query)\n",
"client = meteoblue_dataset_sdk.Client(apikey=os.environ[\"APIKEY\"])\n",
"result = client.query_sync(query)\n",
"# result is a structured object containing timestamps and data\n",
"\n",
"timeInterval = result.geometries[0].timeIntervals[0]\n",
Expand Down
332 changes: 332 additions & 0 deletions example_measurements.ipynb

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions meteoblue_dataset_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
from .client import ApiError, Client, Error

# from .Dataset_pb2 import Dataset_pb2
3 changes: 2 additions & 1 deletion meteoblue_dataset_sdk/caching/filecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import logging
import tempfile
import zlib
from typing import Optional
from pathlib import Path
from typing import Optional

import aiofiles
import aiofiles.os

Expand Down
137 changes: 113 additions & 24 deletions meteoblue_dataset_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
"""

import asyncio
import logging
from contextlib import asynccontextmanager
import copy
import hashlib
import json
import logging
from contextlib import asynccontextmanager

import aiohttp

from .Dataset_pb2 import DatasetApiProtobuf
from .protobuf.dataset_pb2 import DatasetApiProtobuf
from .protobuf.measurements_pb2 import MeasurementApiProtobuf
from .utils import run_async


Expand Down Expand Up @@ -60,18 +63,24 @@ async def _fetch(
session: aiohttp.ClientSession,
method: str,
url: str,
post_data: dict = None,
body_dict: dict = None,
query_params: dict = None,
):
"""
Fetch data from an URL and try for error 5xx or timeouts.
Codes other than 2xx will throw an exception.
:param url: url to call http GET on
:param session: an active aiohttp.ClientSession
:param method: HTTP verb to use for the request
:return: query_params: the parameters to use for the query
:param url: url to fetch data from
:param body_dict: parameters transferred in the body
:param query_params: parameters transferred as query parameters in the url
:return: ClientResponse object from aiohttp lib
"""
logging.debug(f"Getting url {method} {url}")
for retry in range(self._config.http_max_retry_count):
async with session.request(method, url, json=post_data) as response:
async with session.request(
method, url, json=body_dict, params=query_params
) as response:
# return if successful
if 200 <= response.status <= 299:
yield response
Expand All @@ -80,10 +89,15 @@ async def _fetch(
# meteoblue APIs return a JSON encoded error message
if response.status == 400 or response.status == 500:
json_response = await response.json()
logging.debug(
f"API returned error message: {json_response['error_message']}"
)
raise ApiError(json_response["error_message"])
# TODO: dataset api returns object with 'error_message'
# measurement api object with 'reason'
error_message = ""
if "error_message" in json_response:
error_message = json_response["error_message"]
else:
error_message = json_response["reason"]
logging.debug(f"API returned error message: {error_message}")
raise ApiError(error_message)

if retry == self._config.http_max_retry_count - 1:
logging.error(f"API returned unexpected error: {response.content}")
Expand All @@ -108,7 +122,7 @@ async def _run_on_job_queue(self, session: aiohttp.ClientSession, params: dict):
logging.info("Starting job on queue")
params["runOnJobQueue"] = True
url = self._config.query_url.format(self._config.api_key)
async with self._fetch(session, "POST", url, post_data=params) as response:
async with self._fetch(session, "POST", url, body_dict=params) as response:
response_json = await response.json()

# Wait until the job is finished
Expand Down Expand Up @@ -148,17 +162,19 @@ async def _query_raw(self, params: dict):
:return: ClientResponse object from aiohttp lib
"""

# always try to execute without job queue first:
params["runOnJobQueue"] = False
async with aiohttp.ClientSession() as session:
# Try to run the job directly
# In case the API throws an error, try to run it on a job queue
try:
url = self._config.query_url.format(self._config.api_key)
async with self._fetch(
session, "POST", url, post_data=params
session, "POST", url, body_dict=params
) as response:
yield response
except ApiError as error:
# Run on a job queue in case the api throws the error
# Run on a job queue in case the api throws this error
if error.message != "This job must be executed on a job-queue":
raise
async with self._run_on_job_queue(session, params) as response:
Expand All @@ -168,6 +184,12 @@ async def _query_raw(self, params: dict):
def _hash_params(params: dict) -> str:
return hashlib.md5(json.dumps(params).encode()).hexdigest()

@staticmethod
def _parse_dataset(data):
msg = DatasetApiProtobuf()
msg.ParseFromString(data)
return msg

async def query(self, params: dict):
"""
Query meteoblue dataset api asynchronously, transfer data using protobuf and
Expand All @@ -178,28 +200,25 @@ async def query(self, params: dict):
see https://docs.meteoblue.com/en/apis/environmental-data/dataset-api
:return: DatasetApiProtobuf object
"""

# copy params object before making changes to it
params = copy.copy(params)
params["format"] = "protobuf"
cache_key = ""
if self.cache:
cache_key = self._hash_params(params)
cached_query_results = await self.cache.get(cache_key)
if cached_query_results:
msg = DatasetApiProtobuf()
msg.ParseFromString(cached_query_results)
return msg
return self._parse_dataset(cached_query_results)

async with self._query_raw(params) as response:
data = await response.read()
if self.cache:
await self.cache.set(cache_key, data)
msg = DatasetApiProtobuf()
msg.ParseFromString(data)
return msg
return self._parse_dataset(data)

def querySync(self, params: dict):
"""
Query Meteoblue dataset api synchronously for sequential usage.
Query meteoblue dataset api synchronously for sequential usage.
Prefer query_sync in order to respect python semantic.
:param params:
query parameters.
Expand All @@ -210,7 +229,77 @@ def querySync(self, params: dict):

def query_sync(self, params: dict):
"""
Exactly the same as query sync but using underscore in the name.
Keeping QuerySync for backward compatibility.
Exactly the same as querySync but using underscore in the name.
Keeping querySync for backward compatibility.
"""
return self.querySync(params)

def measurement_sync(self, path: str, params: dict):
"""
Query meteoblue measurement api synchronously for sequential usage.
:param path: path of request
:param params: query parameters
:return: MeasurementApiProtobuf object
"""
return run_async(self.measurement_query, path, params)

async def measurement_query(self, path: str, params: dict):
"""
Query meteoblue measurement api asynchronously, transfer data using protobuf and
return a structured object
:param path: path of request
:param params: query parameters
:return: MeasurementApiProtobuf object
"""
# copy params object before making changes to it
params = copy.copy(params)
params["format"] = "protobuf"
cache_key = ""
if self.cache:
cache_key = self._hash_params_measurements(path, params)
cached_query_results = await self.cache.get(cache_key)
if cached_query_results:
self._parse_measurements(cached_query_results)

async with self._query_measurement_api(path, params) as response:
data = await response.read()
if self.cache:
await self.cache.set(cache_key, data)
return self._parse_measurements(data)

@staticmethod
def _parse_measurements(data):
msg = MeasurementApiProtobuf()
msg.ParseFromString(data)
return msg

@staticmethod
def _hash_params_measurements(path: str, params: dict) -> str:
return hashlib.md5(path.encode(), json.dumps(params).encode()).hexdigest()

@asynccontextmanager
async def _query_measurement_api(self, path: str, params: dict):
"""
Query meteoblue measurement api asynchronously and return a
ClientResponse object using context manager
:param path: path of request
:param params: query parameters
:return: ClientResponse object from aiohttp lib
"""
base_url = "http://measurement-api.meteoblue.com"
params["apikey"] = self._config.api_key
url = base_url + path
async with aiohttp.ClientSession() as session:
async with self._fetch(
session=session,
method="GET",
url=url,
body_dict=None,
query_params=params,
) as response:
yield response
Empty file.
File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions meteoblue_dataset_sdk/protobuf/measurements.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
syntax = "proto3";

message MeasurementApiProtobuf {

// variables go here
repeated Column columns = 1;
uint32 rows_count = 2;
uint32 current_page = 3;
uint32 rows_per_page = 4;


message Column {
// meta
string column = 1;
// values
Values values = 2;
}

// Variant type encoding
message Values {
// Exactly one of these values must be present in a valid message
oneof oneof_values {
RepeatedString strings = 1;
RepeatedFloat floats = 2;
RepeatedInt64 ints64 = 3;
}
}

message RepeatedString {
repeated string array = 1;
}

message RepeatedFloat {
repeated float array = 1 [packed=true];
}

message RepeatedInt64 {
repeated int64 array = 1 [packed=true];
}
}
Loading

0 comments on commit 9c3f3e8

Please sign in to comment.