Skip to content

Commit

Permalink
[QProf] DCSlowEventsTable collection (#1255)
Browse files Browse the repository at this point in the history
* [QProf] DCSlowEventsTable collection

 - Solve https://jira.verticacorp.com/jira/browse/VER-95891?filter=-1

* tests correction
  • Loading branch information
oualib authored Aug 4, 2024
1 parent 1a208d7 commit b8aaf74
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 5 deletions.
100 changes: 100 additions & 0 deletions verticapy/performance/vertica/collection/collection_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class AllTableTypes(Enum):
DC_EXPLAIN_PLANS = "dc_explain_plans"
DC_QUERY_EXECUTIONS = "dc_query_executions"
DC_REQUESTS_ISSUED = "dc_requests_issued"
DC_SCAN_EVENTS = "dc_scan_events"
DC_SLOW_EVENTS = "dc_slow_events"
EXECUTION_ENGINE_PROFILES = "execution_engine_profiles"
EXPORT_EVENTS = "export_events"
Expand Down Expand Up @@ -773,6 +774,7 @@ def getAllCollectionTables(
AllTableTypes.DC_EXPLAIN_PLANS,
AllTableTypes.DC_QUERY_EXECUTIONS,
AllTableTypes.DC_REQUESTS_ISSUED,
AllTableTypes.DC_SCAN_EVENTS,
AllTableTypes.DC_SLOW_EVENTS,
AllTableTypes.EXECUTION_ENGINE_PROFILES,
AllTableTypes.EXPORT_EVENTS,
Expand All @@ -787,6 +789,7 @@ def getAllCollectionTables(
AllTableTypes.DC_EXPLAIN_PLANS,
AllTableTypes.DC_QUERY_EXECUTIONS,
AllTableTypes.DC_REQUESTS_ISSUED,
AllTableTypes.DC_SCAN_EVENTS,
AllTableTypes.DC_SLOW_EVENTS,
AllTableTypes.EXECUTION_ENGINE_PROFILES,
# Host resources lacks txn_id, stmt_id
Expand Down Expand Up @@ -860,6 +863,8 @@ def collectionTableFactory(
return DCQueryExecutionsTable(target_schema, key)
if table_type == AllTableTypes.DC_REQUESTS_ISSUED:
return DCRequestsIssuedTable(target_schema, key)
if table_type == AllTableTypes.DC_SCAN_EVENTS:
return DCScanEventsTable(target_schema, key)
if table_type == AllTableTypes.DC_SLOW_EVENTS:
return DCSlowEventsTable(target_schema, key)
if table_type == AllTableTypes.EXECUTION_ENGINE_PROFILES:
Expand Down Expand Up @@ -1265,6 +1270,101 @@ def get_pandas_column_type_adjustments(self) -> Mapping[str, str]:
return {"query_start_epoch": "Int64", "digest": "Int64"}


################ dc_slow_events ###################
class DCScanEventsTable(CollectionTable):
"""
``DCScanEventsTable`` stores data from the system table
dc_scan_events
"""

def __init__(self, table_schema: str, key: str) -> None:
super().__init__(AllTableTypes.DC_SCAN_EVENTS, table_schema, key)

def get_create_table_sql(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS {self.get_import_name_fq()}
(
"time" timestamptz,
node_name varchar(128),
session_id varchar(128),
user_id int,
user_name varchar(128),
transaction_id int,
statement_id int,
request_id int,
plan_id int,
localplan_id int,
operator_id int,
table_oid int,
projection_oid int,
container_index int,
container_description varchar(512),
event_type varchar(512),
event_description varchar(512)
);
"""

def get_create_projection_sql(self) -> str:
import_name = self.get_import_name()
fq_proj_name = self.get_super_proj_name_fq()
import_name_fq = self.get_import_name_fq()
return f"""
CREATE PROJECTION IF NOT EXISTS {fq_proj_name}
/*+basename({import_name}),createtype(A)*/
(
"time",
node_name,
session_id,
user_id,
user_name,
transaction_id,
statement_id,
request_id,
plan_id,
localplan_id,
operator_id,
table_oid,
projection_oid,
container_index,
container_description,
event_type,
event_description
)
AS
SELECT {import_name}."time",
{import_name}.node_name,
{import_name}.session_id,
{import_name}.user_id,
{import_name}.user_name,
{import_name}.transaction_id,
{import_name}.statement_id,
{import_name}.request_id,
{import_name}.plan_id,
{import_name}.localplan_id,
{import_name}.operator_id,
{import_name}.table_oid,
{import_name}.projection_oid,
{import_name}.container_index,
{import_name}.container_description,
{import_name}.event_type,
{import_name}.event_description
FROM {import_name_fq}
ORDER BY {import_name}.transaction_id,
{import_name}.statement_id,
{import_name}.node_name,
{import_name}.request_id
SEGMENTED BY hash({import_name}."time",
{import_name}.user_id,
{import_name}.transaction_id,
{import_name}.statement_id,
{import_name}.request_id)
ALL NODES;
"""

def get_pandas_column_type_adjustments(self) -> Mapping[str, str]:
return {}


################ dc_slow_events ###################
class DCSlowEventsTable(CollectionTable):
"""
Expand Down
4 changes: 2 additions & 2 deletions verticapy/performance/vertica/collection/profile_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ def _handle_missing_files(
the value of ``self.raise_when_missing_files``
"""
if (
len(missing_files) <= 1
): # To change to 0 when we add the dc_slow_events in the archive
len(missing_files) <= 2
): # To change to 0 when we add the dc_slow_events / dc_scan_events in the archive
return
message = (
f"Bundle {self.filename} unpacked in directory {unpack_dir}"
Expand Down
4 changes: 3 additions & 1 deletion verticapy/performance/vertica/qprof.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,8 @@ def _v_table_dict() -> dict:
"dc_explain_plans": "v_internal",
"dc_query_executions": "v_internal",
"dc_plan_activities": "v_internal",
"dc_scan_events": "v_internal",
"dc_slow_events": "v_internal",
"execution_engine_profiles": "v_monitor",
"host_resources": "v_monitor",
"query_events": "v_monitor",
Expand All @@ -1285,7 +1287,6 @@ def _v_table_dict() -> dict:
# New Tables - still not used.
"dc_lock_attempts": "v_internal",
"dc_plan_resources": "v_internal",
"dc_slow_events": "v_internal",
"configuration_parameters": "v_monitor",
"query_consumption": "v_monitor",
"query_events": "v_monitor",
Expand Down Expand Up @@ -2035,6 +2036,7 @@ def get_table(self, table_name: Optional[str] = None) -> Union[list, vDataFrame]
- dc_explain_plans
- dc_query_executions
- dc_requests_issued
- dc_scan_events
- dc_slow_events
- host_resources
- query_plan_profiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def test_profile_export(self, amazon_vd, schema_loader, tmp_path):
set_diff = tarfile_contents.symmetric_difference(expected_files)

assert (
len(set_diff) <= 1
) # To change to 0 when we add the dc_slow_events in the archive
len(set_diff) <= 2
) # To change to 0 when we add the dc_slow_events, dc_scan_events in the archive

def _get_set_of_tables_in_schema(self, target_schema, key):
"""
Expand Down

0 comments on commit b8aaf74

Please sign in to comment.