Skip to content

Commit

Permalink
[CDF-23168] 😄 Dump schema inconsitency (#1380)
Browse files Browse the repository at this point in the history
* fix: standardize columns

* fix: standardize columns

* fix: ignore warnings
  • Loading branch information
doctrino authored Jan 20, 2025
1 parent a12a3dc commit 9bd9851
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.cdf-tk.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Changes are grouped as follows:

## TBD

### Improved

- The `cdf dump asset` and `cdf dump timeseries` commands now always dump tables with
the same columns in the same order.

### Fixed

- The `cdf build` no longer raises a `ToolkitFileNotFoundError` when building without a `config.[env].yaml` file.
Expand Down
29 changes: 28 additions & 1 deletion cognite_toolkit/_cdf_tk/commands/dump_assets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from collections import Counter, defaultdict
from collections.abc import Callable, Iterator
from functools import lru_cache
Expand All @@ -13,7 +14,7 @@
from cognite.client.data_classes import Asset, AssetFilter, AssetList, DataSetWrite, DataSetWriteList
from cognite.client.data_classes.filters import Equals
from cognite.client.exceptions import CogniteAPIError
from rich.progress import Progress, TaskID
from rich.progress import Progress, TaskID, track

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.commands._base import ToolkitCommand
Expand Down Expand Up @@ -45,6 +46,8 @@ def __init__(self, print_warning: bool = True, skip_tracking: bool = False):
self._used_data_sets: set[int] = set()
self._available_data_sets: dict[int, DataSetWrite] | None = None
self._available_hierarchies: dict[int, Asset] | None = None
self._written_files: list[Path] = []
self._used_columns: set[str] = set()

def execute(
self,
Expand Down Expand Up @@ -126,6 +129,9 @@ def execute(
folder_path.mkdir(parents=True, exist_ok=True)
file_count = file_count_by_hierarchy[group]
file_path = folder_path / f"part-{file_count:04}.Asset.{format_}"
self._used_columns.update(df.columns)
# Standardize column order
df.sort_index(axis=1, inplace=True)
if format_ == "csv":
df.to_csv(file_path, index=False, encoding=self.encoding, lineterminator=self.newline)
elif format_ == "parquet":
Expand All @@ -134,10 +140,31 @@ def execute(
if verbose:
print(f"Dumped {len(df):,} assets in {group} to {file_path}")
count += len(df)
self._written_files.append(file_path)
progress.advance(write_to_file, advance=len(df))
else:
raise ToolkitValueError(f"Unsupported format {format_}. Supported formats are yaml, csv, parquet. ")

if format_ in {"csv", "parquet"} and len(self._written_files) > 1:
# Standardize columns across all files
for file_path in track(
self._written_files, total=len(self._written_files), description="Standardizing columns"
):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if format_ == "csv":
df = pd.read_csv(file_path, encoding=self.encoding, lineterminator=self.newline)
else:
df = pd.read_parquet(file_path)
for missing_column in self._used_columns - set(df.columns):
df[missing_column] = None
# Standardize column order
df.sort_index(axis=1, inplace=True)
if format_ == "csv":
df.to_csv(file_path, index=False, encoding=self.encoding, lineterminator=self.newline)
elif format_ == "parquet":
df.to_parquet(file_path, index=False)

print(f"Dumped {count:,} assets to {output_dir}")

if self._used_labels:
Expand Down
30 changes: 29 additions & 1 deletion cognite_toolkit/_cdf_tk/commands/dump_timeseries.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from collections.abc import Iterator
from functools import lru_cache
from pathlib import Path
Expand All @@ -17,7 +18,7 @@
)
from cognite.client.data_classes.filters import Equals
from cognite.client.exceptions import CogniteAPIError
from rich.progress import Progress, TaskID
from rich.progress import Progress, TaskID, track

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.commands._base import ToolkitCommand
Expand Down Expand Up @@ -55,6 +56,9 @@ def __init__(self, print_warning: bool = True, skip_tracking: bool = False):
self._available_data_sets: dict[int, DataSetWrite] | None = None
self._available_hierarchies: dict[int, Asset] | None = None

self._written_files: list[Path] = []
self._used_columns: set[str] = set()

def execute(
self,
ToolGlobals: CDFToolConfig,
Expand Down Expand Up @@ -130,6 +134,8 @@ def execute(
folder_path = output_dir / TIME_SERIES_FOLDER_NAME
folder_path.mkdir(parents=True, exist_ok=True)
file_path = folder_path / f"part-{file_count:04}.TimeSeries.{format_}"
# Standardize column order
df.sort_index(axis=1, inplace=True)
if format_ == "csv":
df.to_csv(
file_path,
Expand All @@ -143,10 +149,32 @@ def execute(
if verbose:
print(f"Dumped {len(df):,} time_series to {file_path}")
count += len(df)
self._written_files.append(file_path)
self._used_columns.update(df.columns)
progress.advance(write_to_file, advance=len(df))
else:
raise ToolkitValueError(f"Unsupported format {format_}. Supported formats are yaml, csv, parquet. ")

if format_ in {"csv", "parquet"} and len(self._written_files) > 1:
# Standardize columns across all files
for file_path in track(
self._written_files, total=len(self._written_files), description="Standardizing columns"
):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if format_ == "csv":
df = pd.read_csv(file_path, encoding=self.encoding, lineterminator=self.newline)
else:
df = pd.read_parquet(file_path)
for missing_column in self._used_columns - set(df.columns):
df[missing_column] = None
# Standardize column order
df.sort_index(axis=1, inplace=True)
if format_ == "csv":
df.to_csv(file_path, index=False, encoding=self.encoding, lineterminator=self.newline)
elif format_ == "parquet":
df.to_parquet(file_path, index=False)

print(f"Dumped {count:,} time_series to {output_dir}")

if self._used_data_sets:
Expand Down

0 comments on commit 9bd9851

Please sign in to comment.