Skip to content

Commit

Permalink
fix and improve timeseries collection support
Browse files Browse the repository at this point in the history
  • Loading branch information
lthurner committed Jan 2, 2024
1 parent 61fc61e commit 20c4d8e
Showing 1 changed file with 58 additions and 31 deletions.
89 changes: 58 additions & 31 deletions pandahub/lib/PandaHub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1747,7 +1747,7 @@ def get_variant_filter(self, variants):
# -------------------------

def bulk_write_to_db(
self, data, collection_name="tasks", global_database=True, project_id=None
self, data, collection_name="tasks", global_database=False, project_id=None
):
"""
Writes any number of documents to the database at once. Checks, if any
Expand Down Expand Up @@ -1776,8 +1776,21 @@ def bulk_write_to_db(
else:
self.check_permission("write")
db = self._get_project_database()
if self.collection_is_timeseries(
collection_name=collection_name,
project_id=project_id,
global_database=global_database,
):
raise NotImplementedError(
"Bulk write is not fully supported for timeseries collections in MongoDB"
)

operations = [
ReplaceOne(replacement=d, filter={"_id": d["_id"]}, upsert=True)
ReplaceOne(
replacement=d,
filter={"_id": d["_id"]},
upsert=True,
)
for d in data
]
db[collection_name].bulk_write(operations)
Expand Down Expand Up @@ -1917,7 +1930,7 @@ def write_timeseries_to_db(
{"metadata": metadata, "timestamp": idx, **row.to_dict()}
for idx, row in timeseries.iterrows()
]
return db.measurements.insert_many(documents)
return db[collection_name].insert_many(documents)
document = create_timeseries_document(
timeseries=timeseries,
data_type=data_type,
Expand Down Expand Up @@ -2321,20 +2334,23 @@ def get_timeseries_metadata(
else:
document_filter = {}
document = db[collection_name].find_one(
document_filter, projection={"timestamp": 0, "metadata": 0, "_id": 0}
document_filter, projection={"timestamp": 0, "_id": 0}
)
value_fields = ["$%s" % field for field in document.keys()]
pipeline.append(
{
"$group": {
"_id": "$metadata._id",
"max_value": {"$max": {"$max": value_fields}},
"min_value": {"$min": {"$min": value_fields}},
"first_timestamp": {"$min": "$timestamp"},
"last_timestamp": {"$max": "$timestamp"},
}
}
)
group_dict = {
"_id": "$metadata._id",
"max_value": {"$max": {"$max": value_fields}},
"min_value": {"$min": {"$min": value_fields}},
"first_timestamp": {"$min": "$timestamp"},
"last_timestamp": {"$max": "$timestamp"},
}
metadata_fields = {
metadata_field: {"$first": "$metadata.%s" % metadata_field}
for metadata_field in document["metadata"].keys()
if metadata_field != "_id"
}
group_dict.update(metadata_fields)
pipeline.append({"$group": group_dict})
else:
match_filter = []
pipeline = []
Expand Down Expand Up @@ -2448,30 +2464,41 @@ def multi_get_timeseries_from_db(
meta_pipeline = []
meta_pipeline.append({"$match": document_filter})
value_fields = ["$%s" % field for field in document.keys()]
meta_pipeline.append(
{
"$group": {
"_id": "$metadata._id",
"max_value": {"$max": {"$max": value_fields}},
"min_value": {"$min": {"$min": value_fields}},
"first_timestamp": {"$min": "$timestamp"},
"last_timestamp": {"$max": "$timestamp"},
"name": {"$first": "$metadata.name"},
"data_type": {"$first": "$metadata.data_type"},
}
}
)
group_dict = {
"_id": "$metadata._id",
"max_value": {"$max": {"$max": value_fields}},
"min_value": {"$min": {"$min": value_fields}},
"first_timestamp": {"$min": "$timestamp"},
"last_timestamp": {"$max": "$timestamp"},
}
document = db[collection_name].find_one(document_filter)
metadata_fields = {
metadata_field: {"$first": "$metadata.%s" % metadata_field}
for metadata_field in document["metadata"].keys()
if metadata_field != "_id"
}
group_dict.update(metadata_fields)
meta_pipeline.append({"$group": group_dict})
meta_data = {
d["_id"]: d for d in db[collection_name].aggregate(meta_pipeline)
}
timeseries = []
ts_all = db.measurements.aggregate_pandas_all(pipeline)
ts_all = db[collection_name].aggregate_pandas_all(pipeline)
if len(ts_all) == 0:
return timeseries
for _id, ts in ts_all.groupby("_id"):
ts.set_index("timestamp", inplace=True)
for col in set(ts.columns) - {"timestamp", "_id"}:
value_columns = list(set(ts.columns) - {"timestamp", "_id"})
value_columns.sort()
for col in value_columns:
timeseries_dict = {"timeseries_data": ts[col]}
if include_metadata:
timeseries_dict.update(meta_data[_id])
if len(value_columns) > 1:
timeseries_dict["name"] = "%s, %s" % (
timeseries_dict["name"],
col,
)
timeseries.append(timeseries_dict)
return timeseries

Expand Down Expand Up @@ -2863,7 +2890,7 @@ def create_timeseries_collection(self, collection_name, overwrite=False):
if overwrite:
db.drop_collection(collection_name)
else:
print("Collection exists, skipping")
logger.info("Collection already exists, skipping")
return
db.create_collection(
collection_name,
Expand Down

0 comments on commit 20c4d8e

Please sign in to comment.