Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized requests to analytics DB, using timestamps, to avoid going trough the whole table #7833

Merged
merged 15 commits into from
May 6, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Longer analytics report calculation because of inefficient requests to analytics db
(<https://github.com/cvat-ai/cvat/pull/7833>)
74 changes: 63 additions & 11 deletions cvat/apps/analytics_report/report/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,26 @@ def _check_analytics_report(
queryset = Job.objects.select_related("analytics_report")
db_job = queryset.get(pk=cvat_job_id)

start_timestamp = db_job.created_date
end_timestamp = db_job.updated_date

db_report = cls._get_analytics_report(db_job)
primary_metric_extractors = dict(
(
(JobObjects.key(), JobObjectsExtractor(cvat_job_id)),
(JobAnnotationSpeed.key(), JobAnnotationSpeedExtractor(cvat_job_id)),
(JobAnnotationTime.key(), JobAnnotationTimeExtractor(cvat_job_id)),
(
JobObjects.key(),
JobObjectsExtractor(start_timestamp, end_timestamp, cvat_job_id),
),
(
JobAnnotationSpeed.key(),
JobAnnotationSpeedExtractor(
start_timestamp, end_timestamp, cvat_job_id
),
),
(
JobAnnotationTime.key(),
JobAnnotationTimeExtractor(start_timestamp, end_timestamp, cvat_job_id),
),
)
)
db_report = cls()._compute_report_for_job(
Expand All @@ -208,18 +222,30 @@ def _check_analytics_report(
"segment_set__job_set"
)
db_task = queryset.get(pk=cvat_task_id)

db_report = cls._get_analytics_report(db_task)

start_timestamp = db_task.created_date
end_timestamp = db_task.updated_date

primary_metric_extractors = dict(
(
(JobObjects.key(), JobObjectsExtractor(task_ids=[cvat_task_id])),
(
JobObjects.key(),
JobObjectsExtractor(
start_timestamp, end_timestamp, task_ids=[cvat_task_id]
),
),
(
JobAnnotationSpeed.key(),
JobAnnotationSpeedExtractor(task_ids=[cvat_task_id]),
JobAnnotationSpeedExtractor(
start_timestamp, end_timestamp, task_ids=[cvat_task_id]
),
),
(
JobAnnotationTime.key(),
JobAnnotationTimeExtractor(task_ids=[cvat_task_id]),
JobAnnotationTimeExtractor(
start_timestamp, end_timestamp, task_ids=[cvat_task_id]
),
),
)
)
Expand Down Expand Up @@ -258,12 +284,38 @@ def _check_analytics_report(

db_project = queryset.get(pk=cvat_project_id)
db_report = cls._get_analytics_report(db_project)
task_ids = [item["id"] for item in db_project.tasks.values("id")]

tasks_data = db_project.tasks.values("id", "created_date", "updated_date")
start_timestamp = (
min([item["created_date"] for item in tasks_data])
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
if len(tasks_data)
else db_project.created_date
)
end_timestamp = (
max([item["updated_date"] for item in tasks_data])
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
if len(tasks_data)
else db_project.updated_date
)
task_ids = [item["id"] for item in tasks_data]

primary_metric_extractors = dict(
(
(JobObjects.key(), JobObjectsExtractor(task_ids=task_ids)),
(JobAnnotationSpeed.key(), JobAnnotationSpeedExtractor(task_ids=task_ids)),
(JobAnnotationTime.key(), JobAnnotationTimeExtractor(task_ids=task_ids)),
(
JobObjects.key(),
JobObjectsExtractor(start_timestamp, end_timestamp, task_ids=task_ids),
),
(
JobAnnotationSpeed.key(),
JobAnnotationSpeedExtractor(
start_timestamp, end_timestamp, task_ids=task_ids
),
),
(
JobAnnotationTime.key(),
JobAnnotationTimeExtractor(
start_timestamp, end_timestamp, task_ids=task_ids
),
),
)
)
db_report, task_reports, job_reports = cls()._compute_report_for_project(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
#
# SPDX-License-Identifier: MIT

from copy import deepcopy
from datetime import datetime

from dateutil import parser

import cvat.apps.dataset_manager as dm
Expand All @@ -19,13 +22,33 @@


class JobAnnotationSpeedExtractor(DataExtractorBase):
def __init__(self, job_id: int = None, task_ids: list[int] = None):
super().__init__(job_id, task_ids)
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
super().__init__(start_datetime, end_datetime, job_id, task_ids)

SELECT = ["job_id", "JSONExtractUInt(payload, 'working_time') as wt, timestamp"]
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
WHERE = []

if task_ids is not None:
self._query = "SELECT job_id, sum(JSONExtractUInt(payload, 'working_time')) as wt FROM events WHERE task_id IN ({task_ids:Array(UInt64)}) AND timestamp >= {start_datetime:DateTime64} AND timestamp < {end_datetime:DateTime64} GROUP BY job_id"
WHERE.append("task_id IN ({task_ids:Array(UInt64)})")
elif job_id is not None:
self._query = "SELECT job_id, sum(JSONExtractUInt(payload, 'working_time')) as wt FROM events WHERE job_id={job_id:UInt64} AND timestamp >= {start_datetime:DateTime64} AND timestamp < {end_datetime:DateTime64} GROUP BY job_id"
WHERE.append("job_id={job_id:UInt64}")

WHERE.extend(
[
"wt > 0",
"timestamp >= {start_datetime:DateTime64}",
"timestamp < {end_datetime:DateTime64}",
]
)

# bandit false alarm
self._query = f"SELECT {', '.join(SELECT)} FROM events WHERE {' AND '.join(WHERE)} ORDER BY timestamp" # nosec B608


class JobAnnotationSpeed(PrimaryMetricBase):
Expand Down Expand Up @@ -66,47 +89,40 @@ def get_track_count(annotations):

return count

def get_default():
return {
"data_series": {
"object_count": [],
"working_time": [],
}
}

# Calculate object count
annotations = dm.task.get_job_data(self._db_obj.id)
object_count = 0
object_count += get_tags_count(annotations)
object_count += get_shapes_count(annotations)
object_count += get_track_count(annotations)

timestamp = self._get_utc_now()
start_datetime = self._db_obj.created_date
timestamp = self._db_obj.updated_date
timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")

report = self._db_obj.analytics_report
if report is None:
statistics = get_default()
else:
report = getattr(self._db_obj, "analytics_report", None)
data_series = self.get_empty()
if report is not None:
statistics = next(
filter(lambda s: s["name"] == "annotation_speed", report.statistics), get_default()
filter(lambda s: s["name"] == "annotation_speed", report.statistics), None
)

data_series = statistics["data_series"]
if statistics is not None:
data_series = deepcopy(statistics["data_series"])

last_entry_count = 0
start_datetime = self._db_obj.created_date
if data_series["object_count"]:
last_entry = data_series["object_count"][-1]
last_entry_timestamp = parser.parse(last_entry["datetime"])

if last_entry_timestamp.date() == timestamp.date():
# remove last entry, it will be re-calculated below, because of the same date
data_series["object_count"] = data_series["object_count"][:-1]
data_series["working_time"] = data_series["working_time"][:-1]

if len(data_series["object_count"]):
last_last_entry = data_series["object_count"][-1]
start_datetime = parser.parse(last_last_entry["datetime"])
last_entry_count = last_last_entry["value"]
current_last_entry = data_series["object_count"][-1]
start_datetime = parser.parse(current_last_entry["datetime"])
last_entry_count = current_last_entry["value"]
else:
last_entry_count = last_entry["value"]
start_datetime = parser.parse(last_entry["datetime"])
Expand All @@ -118,21 +134,21 @@ def get_default():
}
)

# Calculate working time
rows = list(
self._data_extractor.extract_for_job(
self._db_obj.id,
{
"start_datetime": start_datetime,
"end_datetime": self._get_utc_now(),
},
)
)

value = (rows[0][0] if len(rows) else 0) / (1000 * 3600)
working_time = 0
for row in rows:
wt, datetime = row
if start_datetime <= datetime < timestamp:
working_time += wt

data_series["working_time"].append(
{
"value": value,
"value": working_time / (1000 * 3600),
"datetime": timestamp_str,
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#
# SPDX-License-Identifier: MIT

from datetime import datetime

from cvat.apps.analytics_report.models import ViewChoice
from cvat.apps.analytics_report.report.primary_metrics.base import (
DataExtractorBase,
Expand All @@ -10,13 +12,34 @@


class JobAnnotationTimeExtractor(DataExtractorBase):
def __init__(self, job_id: int = None, task_ids: list[int] = None):
super().__init__(job_id, task_ids)
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
super().__init__(start_datetime, end_datetime, job_id, task_ids)

SELECT = ["job_id", "timestamp", "obj_val"]
WHERE = []

if task_ids is not None:
self._query = "SELECT job_id, timestamp, obj_val FROM events WHERE scope='update:job' AND task_id IN ({task_ids:Array(UInt64)}) AND obj_name='state' ORDER BY timestamp ASC"
WHERE.append("task_id IN ({task_ids:Array(UInt64)})")
elif job_id is not None:
self._query = "SELECT job_id, timestamp, obj_val FROM events WHERE scope='update:job' AND job_id={job_id:UInt64} AND obj_name='state' ORDER BY timestamp ASC"
WHERE.append("job_id={job_id:UInt64}")

WHERE.extend(
[
"scope='update:job'",
"obj_name='state'",
"timestamp >= {start_datetime:DateTime64}",
"timestamp < {end_datetime:DateTime64}",
]
)

# bandit false alarm
self._query = f"SELECT {', '.join(SELECT)} FROM events WHERE {' AND '.join(WHERE)} ORDER BY timestamp ASC" # nosec B608


class JobAnnotationTime(PrimaryMetricBase):
Expand Down
19 changes: 15 additions & 4 deletions cvat/apps/analytics_report/report/primary_metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@
# SPDX-License-Identifier: MIT

from abc import ABCMeta, abstractmethod
from collections import ChainMap
from datetime import datetime, timezone

from cvat.apps.analytics_report.report.primary_metrics.utils import make_clickhouse_query


class DataExtractorBase:
def __init__(self, job_id: int = None, task_ids: list[int] = None):
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
# Raw SQL queries are used to execute ClickHouse queries, as there is no ORM available here
self._query = None
self._parameters = {}
self._parameters = {
"start_datetime": start_datetime,
"end_datetime": end_datetime,
}
self._rows = []
self._initialized = False

Expand All @@ -28,7 +36,10 @@ def _make_clickhouse_query(self, parameters):
def extract_for_job(self, job_id: int, extras: dict = None):
if not self._initialized:
self._rows = self._make_clickhouse_query(
ChainMap(self._parameters, extras or {})
{
key: value
for key, value in list(self._parameters.items()) + list((extras or {}).items())
}
).result_rows
self._initialized = True
return map(lambda x: x[1:], filter(lambda x: x[0] == job_id, self._rows))
Expand Down
32 changes: 28 additions & 4 deletions cvat/apps/analytics_report/report/primary_metrics/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#
# SPDX-License-Identifier: MIT

from datetime import datetime

from cvat.apps.analytics_report.models import GranularityChoice, ViewChoice
from cvat.apps.analytics_report.report.primary_metrics.base import (
DataExtractorBase,
Expand All @@ -10,13 +12,35 @@


class JobObjectsExtractor(DataExtractorBase):
def __init__(self, job_id: int = None, task_ids: list[int] = None):
super().__init__(job_id, task_ids)
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
super().__init__(start_datetime, end_datetime, job_id, task_ids)

SELECT = ["job_id", "toStartOfDay(timestamp) as day", "scope", "sum(count)"]
WHERE = []

if task_ids is not None:
self._query = "SELECT job_id, toStartOfDay(timestamp) as day, scope, sum(count) FROM events WHERE scope IN ({scopes:Array(String)}) AND task_id IN ({task_ids:Array(UInt64)}) GROUP BY scope, day, job_id ORDER BY day ASC"
WHERE.append("task_id IN ({task_ids:Array(UInt64)})")
elif job_id is not None:
self._query = "SELECT job_id, toStartOfDay(timestamp) as day, scope, sum(count) FROM events WHERE scope IN ({scopes:Array(String)}) AND job_id = {job_id:UInt64} GROUP BY scope, day, job_id ORDER BY day ASC"
WHERE.append("job_id={job_id:UInt64}")

WHERE.extend(
[
"scope IN ({scopes:Array(String)})",
"timestamp >= {start_datetime:DateTime64}",
"timestamp < {end_datetime:DateTime64}",
]
)

GROUP_BY = ["scope", "day", "job_id"]

# bandit false alarm
self._query = f"SELECT {', '.join(SELECT)} FROM events WHERE {' AND '.join(WHERE)} GROUP BY {', '.join(GROUP_BY)} ORDER BY day ASC" # nosec B608


class JobObjects(PrimaryMetricBase):
Expand Down
Loading