Skip to content

Commit

Permalink
Optimized requests to analytics DB, using timestamps, to avoid going …
Browse files Browse the repository at this point in the history
…trough the whole table (#7833)
  • Loading branch information
bsekachev authored May 6, 2024
1 parent 217bab8 commit 6a85e61
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Longer analytics report calculation because of inefficient requests to analytics db
(<https://github.com/cvat-ai/cvat/pull/7833>)
76 changes: 64 additions & 12 deletions cvat/apps/analytics_report/report/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MIT

from datetime import datetime
from datetime import datetime, timedelta
from typing import Union

import django_rq
Expand Down Expand Up @@ -132,12 +132,26 @@ def _check_analytics_report(
queryset = Job.objects.select_related("analytics_report")
db_job = queryset.get(pk=cvat_job_id)

start_timestamp = db_job.created_date
end_timestamp = db_job.updated_date + timedelta(seconds=1)

db_report = cls._get_analytics_report(db_job)
primary_metric_extractors = dict(
(
(JobObjects.key(), JobObjectsExtractor(cvat_job_id)),
(JobAnnotationSpeed.key(), JobAnnotationSpeedExtractor(cvat_job_id)),
(JobAnnotationTime.key(), JobAnnotationTimeExtractor(cvat_job_id)),
(
JobObjects.key(),
JobObjectsExtractor(start_timestamp, end_timestamp, cvat_job_id),
),
(
JobAnnotationSpeed.key(),
JobAnnotationSpeedExtractor(
start_timestamp, end_timestamp, cvat_job_id
),
),
(
JobAnnotationTime.key(),
JobAnnotationTimeExtractor(start_timestamp, end_timestamp, cvat_job_id),
),
)
)
db_report = cls()._compute_report_for_job(
Expand All @@ -158,18 +172,30 @@ def _check_analytics_report(
"segment_set__job_set"
)
db_task = queryset.get(pk=cvat_task_id)

db_report = cls._get_analytics_report(db_task)

start_timestamp = db_task.created_date
end_timestamp = db_task.updated_date + timedelta(seconds=1)

primary_metric_extractors = dict(
(
(JobObjects.key(), JobObjectsExtractor(task_ids=[cvat_task_id])),
(
JobObjects.key(),
JobObjectsExtractor(
start_timestamp, end_timestamp, task_ids=[cvat_task_id]
),
),
(
JobAnnotationSpeed.key(),
JobAnnotationSpeedExtractor(task_ids=[cvat_task_id]),
JobAnnotationSpeedExtractor(
start_timestamp, end_timestamp, task_ids=[cvat_task_id]
),
),
(
JobAnnotationTime.key(),
JobAnnotationTimeExtractor(task_ids=[cvat_task_id]),
JobAnnotationTimeExtractor(
start_timestamp, end_timestamp, task_ids=[cvat_task_id]
),
),
)
)
Expand Down Expand Up @@ -208,12 +234,38 @@ def _check_analytics_report(

db_project = queryset.get(pk=cvat_project_id)
db_report = cls._get_analytics_report(db_project)
task_ids = [item["id"] for item in db_project.tasks.values("id")]

tasks_data = db_project.tasks.values("id", "created_date", "updated_date")
start_timestamp = (
min(item["created_date"] for item in tasks_data)
if len(tasks_data)
else db_project.created_date
)
end_timestamp = (
max(item["updated_date"] for item in tasks_data)
if len(tasks_data)
else db_project.updated_date
) + timedelta(seconds=1)
task_ids = [item["id"] for item in tasks_data]

primary_metric_extractors = dict(
(
(JobObjects.key(), JobObjectsExtractor(task_ids=task_ids)),
(JobAnnotationSpeed.key(), JobAnnotationSpeedExtractor(task_ids=task_ids)),
(JobAnnotationTime.key(), JobAnnotationTimeExtractor(task_ids=task_ids)),
(
JobObjects.key(),
JobObjectsExtractor(start_timestamp, end_timestamp, task_ids=task_ids),
),
(
JobAnnotationSpeed.key(),
JobAnnotationSpeedExtractor(
start_timestamp, end_timestamp, task_ids=task_ids
),
),
(
JobAnnotationTime.key(),
JobAnnotationTimeExtractor(
start_timestamp, end_timestamp, task_ids=task_ids
),
),
)
)
db_report, task_reports, job_reports = cls()._compute_report_for_project(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
#
# SPDX-License-Identifier: MIT

from copy import deepcopy
from datetime import datetime

from dateutil import parser

import cvat.apps.dataset_manager as dm
Expand All @@ -19,13 +22,33 @@


class JobAnnotationSpeedExtractor(DataExtractorBase):
def __init__(self, job_id: int = None, task_ids: list[int] = None):
super().__init__(job_id, task_ids)
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
super().__init__(start_datetime, end_datetime, job_id, task_ids)

SELECT = ["job_id", "JSONExtractUInt(payload, 'working_time') as wt", "timestamp"]
WHERE = []

if task_ids is not None:
self._query = "SELECT job_id, sum(JSONExtractUInt(payload, 'working_time')) as wt FROM events WHERE task_id IN ({task_ids:Array(UInt64)}) AND timestamp >= {start_datetime:DateTime64} AND timestamp < {end_datetime:DateTime64} GROUP BY job_id"
WHERE.append("task_id IN ({task_ids:Array(UInt64)})")
elif job_id is not None:
self._query = "SELECT job_id, sum(JSONExtractUInt(payload, 'working_time')) as wt FROM events WHERE job_id={job_id:UInt64} AND timestamp >= {start_datetime:DateTime64} AND timestamp < {end_datetime:DateTime64} GROUP BY job_id"
WHERE.append("job_id={job_id:UInt64}")

WHERE.extend(
[
"wt > 0",
"timestamp >= {start_datetime:DateTime64}",
"timestamp < {end_datetime:DateTime64}",
]
)

# bandit false alarm
self._query = f"SELECT {', '.join(SELECT)} FROM events WHERE {' AND '.join(WHERE)} ORDER BY timestamp" # nosec B608


class JobAnnotationSpeed(PrimaryMetricBase):
Expand Down Expand Up @@ -66,47 +89,40 @@ def get_track_count(annotations):

return count

def get_default():
return {
"data_series": {
"object_count": [],
"working_time": [],
}
}

# Calculate object count
annotations = dm.task.get_job_data(self._db_obj.id)
object_count = 0
object_count += get_tags_count(annotations)
object_count += get_shapes_count(annotations)
object_count += get_track_count(annotations)

timestamp = self._get_utc_now()
start_datetime = self._db_obj.created_date
timestamp = self._db_obj.updated_date
timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")

report = self._db_obj.analytics_report
if report is None:
statistics = get_default()
else:
report = getattr(self._db_obj, "analytics_report", None)
data_series = self.get_empty()
if report is not None:
statistics = next(
filter(lambda s: s["name"] == "annotation_speed", report.statistics), get_default()
filter(lambda s: s["name"] == "annotation_speed", report.statistics), None
)

data_series = statistics["data_series"]
if statistics is not None:
data_series = deepcopy(statistics["data_series"])

last_entry_count = 0
start_datetime = self._db_obj.created_date
if data_series["object_count"]:
last_entry = data_series["object_count"][-1]
last_entry_timestamp = parser.parse(last_entry["datetime"])

if last_entry_timestamp.date() == timestamp.date():
# remove last entry, it will be re-calculated below, because of the same date
data_series["object_count"] = data_series["object_count"][:-1]
data_series["working_time"] = data_series["working_time"][:-1]

if len(data_series["object_count"]):
last_last_entry = data_series["object_count"][-1]
start_datetime = parser.parse(last_last_entry["datetime"])
last_entry_count = last_last_entry["value"]
current_last_entry = data_series["object_count"][-1]
start_datetime = parser.parse(current_last_entry["datetime"])
last_entry_count = current_last_entry["value"]
else:
last_entry_count = last_entry["value"]
start_datetime = parser.parse(last_entry["datetime"])
Expand All @@ -118,21 +134,21 @@ def get_default():
}
)

# Calculate working time
rows = list(
self._data_extractor.extract_for_job(
self._db_obj.id,
{
"start_datetime": start_datetime,
"end_datetime": self._get_utc_now(),
},
)
)

value = (rows[0][0] if len(rows) else 0) / (1000 * 3600)
working_time = 0
for row in rows:
wt, datetime = row
if start_datetime <= datetime < timestamp:
working_time += wt

data_series["working_time"].append(
{
"value": value,
"value": working_time / (1000 * 3600),
"datetime": timestamp_str,
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#
# SPDX-License-Identifier: MIT

from datetime import datetime

from cvat.apps.analytics_report.models import ViewChoice
from cvat.apps.analytics_report.report.primary_metrics.base import (
DataExtractorBase,
Expand All @@ -10,13 +12,34 @@


class JobAnnotationTimeExtractor(DataExtractorBase):
def __init__(self, job_id: int = None, task_ids: list[int] = None):
super().__init__(job_id, task_ids)
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
super().__init__(start_datetime, end_datetime, job_id, task_ids)

SELECT = ["job_id", "timestamp", "obj_val"]
WHERE = []

if task_ids is not None:
self._query = "SELECT job_id, timestamp, obj_val FROM events WHERE scope='update:job' AND task_id IN ({task_ids:Array(UInt64)}) AND obj_name='state' ORDER BY timestamp ASC"
WHERE.append("task_id IN ({task_ids:Array(UInt64)})")
elif job_id is not None:
self._query = "SELECT job_id, timestamp, obj_val FROM events WHERE scope='update:job' AND job_id={job_id:UInt64} AND obj_name='state' ORDER BY timestamp ASC"
WHERE.append("job_id={job_id:UInt64}")

WHERE.extend(
[
"scope='update:job'",
"obj_name='state'",
"timestamp >= {start_datetime:DateTime64}",
"timestamp < {end_datetime:DateTime64}",
]
)

# bandit false alarm
self._query = f"SELECT {', '.join(SELECT)} FROM events WHERE {' AND '.join(WHERE)} ORDER BY timestamp ASC" # nosec B608


class JobAnnotationTime(PrimaryMetricBase):
Expand Down
19 changes: 15 additions & 4 deletions cvat/apps/analytics_report/report/primary_metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@
# SPDX-License-Identifier: MIT

from abc import ABCMeta, abstractmethod
from collections import ChainMap
from datetime import datetime, timezone

from cvat.apps.analytics_report.report.primary_metrics.utils import make_clickhouse_query


class DataExtractorBase:
def __init__(self, job_id: int = None, task_ids: list[int] = None):
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
# Raw SQL queries are used to execute ClickHouse queries, as there is no ORM available here
self._query = None
self._parameters = {}
self._parameters = {
"start_datetime": start_datetime,
"end_datetime": end_datetime,
}
self._rows = []
self._initialized = False

Expand All @@ -28,7 +36,10 @@ def _make_clickhouse_query(self, parameters):
def extract_for_job(self, job_id: int, extras: dict = None):
if not self._initialized:
self._rows = self._make_clickhouse_query(
ChainMap(self._parameters, extras or {})
{
key: value
for key, value in list(self._parameters.items()) + list((extras or {}).items())
}
).result_rows
self._initialized = True
return map(lambda x: x[1:], filter(lambda x: x[0] == job_id, self._rows))
Expand Down
32 changes: 28 additions & 4 deletions cvat/apps/analytics_report/report/primary_metrics/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#
# SPDX-License-Identifier: MIT

from datetime import datetime

from cvat.apps.analytics_report.models import GranularityChoice, ViewChoice
from cvat.apps.analytics_report.report.primary_metrics.base import (
DataExtractorBase,
Expand All @@ -10,13 +12,35 @@


class JobObjectsExtractor(DataExtractorBase):
def __init__(self, job_id: int = None, task_ids: list[int] = None):
super().__init__(job_id, task_ids)
def __init__(
self,
start_datetime: datetime,
end_datetime: datetime,
job_id: int = None,
task_ids: list[int] = None,
):
super().__init__(start_datetime, end_datetime, job_id, task_ids)

SELECT = ["job_id", "toStartOfDay(timestamp) as day", "scope", "sum(count)"]
WHERE = []

if task_ids is not None:
self._query = "SELECT job_id, toStartOfDay(timestamp) as day, scope, sum(count) FROM events WHERE scope IN ({scopes:Array(String)}) AND task_id IN ({task_ids:Array(UInt64)}) GROUP BY scope, day, job_id ORDER BY day ASC"
WHERE.append("task_id IN ({task_ids:Array(UInt64)})")
elif job_id is not None:
self._query = "SELECT job_id, toStartOfDay(timestamp) as day, scope, sum(count) FROM events WHERE scope IN ({scopes:Array(String)}) AND job_id = {job_id:UInt64} GROUP BY scope, day, job_id ORDER BY day ASC"
WHERE.append("job_id={job_id:UInt64}")

WHERE.extend(
[
"scope IN ({scopes:Array(String)})",
"timestamp >= {start_datetime:DateTime64}",
"timestamp < {end_datetime:DateTime64}",
]
)

GROUP_BY = ["scope", "day", "job_id"]

# bandit false alarm
self._query = f"SELECT {', '.join(SELECT)} FROM events WHERE {' AND '.join(WHERE)} GROUP BY {', '.join(GROUP_BY)} ORDER BY day ASC" # nosec B608


class JobObjects(PrimaryMetricBase):
Expand Down

0 comments on commit 6a85e61

Please sign in to comment.