Skip to content

Commit

Permalink
Update base.py (#30)
Browse files Browse the repository at this point in the history
* Update base.py

Update name of keyword argument for the call to self.client.raw.tables.create() from 'table' to 'name', which is corresponding to cognite-sdk versions ^6.0.0.

* fix: version bump to v3.0.0-beta2

* fix: pydantic root-class
- switch to `extra="forbid"`
- which avoids unnoticed misspelled properties in configurations

* fix(typing): removing old typing imports
- like Dict, Set and Tuple

* fix(config): set a default for `created-by`
- expected it to be visble as "Owner" column in extpipe listing
- but it isn't, maybe a Fusion bug?

* fx: deploy
- changing the processing
  - which wasn't working when `naming-pattern` where used
  - using ExtractionPipelineList and `as_external_ids()` now

---------

Co-authored-by: Peter Arwanitis <[email protected]>
  • Loading branch information
janne123456789 and spex66 authored Nov 24, 2023
1 parent ecee03c commit db3705f
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 71 deletions.
2 changes: 1 addition & 1 deletion action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ inputs:
required: true
runs:
using: "docker"
image: docker://cognite/extpipes-cli:v3.0.0-beta1
image: docker://cognite/extpipes-cli:v3.0.0-beta2
env:
CDF_TOKEN_URL: ${{ inputs.token-url }}
CDF_PROJECT: ${{ inputs.cdf-project-name }}
Expand Down
11 changes: 1 addition & 10 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
# matching the name of the "script" hook, used from Dockerfile
name = "extpipes-cli"
version = "3.0.0-beta1"
version = "3.0.0-beta2"
description = "A CLI to deploy CDF Extraction Pipelines"
authors = ["Peter Arwanitis <[email protected]>", "Tugce Ozgur Oztetik <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -55,12 +55,3 @@ extpipes-cli = "extpipes.__main__:main"
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"

# 231005 pa: deact semver
# [tool.semantic_release]
# version_toml = [
# "pyproject.toml:tool.poetry.version"
# ]
version_variable = [
"src/extpipes/__init__.py:__version__",
]
2 changes: 1 addition & 1 deletion src/extpipes/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.0.0-beta1"
__version__ = "3.0.0-beta2"
4 changes: 2 additions & 2 deletions src/extpipes/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# ## dependencies are validated
# * `schedule` only supports: `On trigger | Continuous | <cron expression> | null`

from typing import Dict, Optional
from typing import Optional

import click
from click import Context
Expand Down Expand Up @@ -168,7 +168,7 @@ def extpipes_cli(
help="Delete extpipes which are not specified in config-file",
)
@click.pass_obj
def deploy(obj: Dict, config_file: str, automatic_delete: bool = True) -> None:
def deploy(obj: dict, config_file: str, automatic_delete: bool = True) -> None:
click.echo(click.style("Deploying Extraction Pipelines...", fg="green"))

try:
Expand Down
12 changes: 6 additions & 6 deletions src/extpipes/app_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from enum import ReprEnum # new in 3.11
from typing import Annotated, Dict, Optional, Set
from typing import Annotated, Optional

from jinja2 import Environment, meta
from pydantic import Field, StringConstraints, field_validator, model_validator
Expand Down Expand Up @@ -45,15 +45,15 @@ class Pipeline(Model):
schedule: Annotated[str, StringConstraints(pattern=CRON_OR_FIXED_PATTERN)]
contacts: list[Contact] = Field(default=list())
source: Optional[str] = Field(default=None)
metadata: Dict[str, str] = Field(default=dict())
metadata: dict[str, str] = Field(default=dict())
documentation: Optional[str] = Field(default=None)
created_by: Optional[str] = Field(default=None)
created_by: Optional[str] = Field(default=f"dataops - extpipes-cli@v{__version__}")
raw_tables: list[RawTable] = Field(default=list())
extpipe_config: Optional[Dict[str, str]] = Field(default=None)
extpipe_config: Optional[dict[str, str]] = Field(default=None)

@field_validator("metadata")
@classmethod
def ensure_metadata_to_have_version(cls, v: Dict[str, str]) -> Dict[str, str]:
def ensure_metadata_to_have_version(cls, v: dict[str, str]) -> dict[str, str]:
if "Dataops_created" not in v:
v["Dataops_created"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if "Dataops_source" not in v:
Expand Down Expand Up @@ -108,7 +108,7 @@ def check_pattern_condition(self) -> "ExtpipesConfig":
logging.error(f"## Misconfigured pipelines name: {misconfigured}")
raise ValueError("With pattern provider, pipelines should not have names defined.")

def extract_jinja_variables(template: str) -> Set[str]:
def extract_jinja_variables(template: str) -> set[str]:
env = Environment()
parsed_content = env.parse(template)
# Extract all variables from the parsed content
Expand Down
4 changes: 2 additions & 2 deletions src/extpipes/app_container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging.config
import os
from pathlib import Path
from typing import Dict, Optional, Type
from typing import Optional, Type

from dependency_injector import containers, providers
from dotenv import load_dotenv
Expand Down Expand Up @@ -44,7 +44,7 @@ def init_container(
return container


def init_logging(logging_config: Optional[Dict], deprecated_logger_config: Optional[Dict]):
def init_logging(logging_config: Optional[dict], deprecated_logger_config: Optional[dict]):
# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
# from logging-cookbook examples for 'logging_config' dict
# TODO: needed to handle missing log folders?
Expand Down
11 changes: 5 additions & 6 deletions src/extpipes/commands/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import pprint
from pathlib import Path
from typing import Dict, Self
from typing import Self

from cognite.client import CogniteClient
from cognite.client.exceptions import CogniteNotFoundError
Expand All @@ -21,7 +21,6 @@ def __init__(
dry_run: bool,
dotenv_path: str | Path | None = None,
):

# validate and load config according to command-mode
ContainerCls = ContainerSelector[command]
self.container = init_container(ContainerCls, config_path=config_path, dotenv_path=dotenv_path)
Expand Down Expand Up @@ -78,7 +77,7 @@ def validate_config(self) -> Self:

def ensure_raw_tables(self):
# RAW
def find_missing(existing: Dict, target: Dict) -> Dict:
def find_missing(existing: dict, target: dict) -> dict:
missing = {}

for key, value_list in target.items():
Expand All @@ -94,20 +93,20 @@ def find_missing(existing: Dict, target: Dict) -> Dict:
return missing

# build dictionary of configured dbs:tables
requested_raw_tables: Dict = {}
requested_raw_tables: dict = {}
for pipeline in self.extpipes_config.pipelines:
for raw_table in pipeline.raw_tables:
if raw_table.db_name in requested_raw_tables:
requested_raw_tables[raw_table.db_name].append(raw_table.table_name)
else:
requested_raw_tables[raw_table.db_name] = [raw_table.table_name]
# get existing dbs/tables into a dict for fast lookup
cdf_dbs: Dict = {}
cdf_dbs: dict = {}
for db_name in {_db.name for _db in self.client.raw.databases.list(limit=None)}:
cdf_dbs[db_name] = [table.name for table in self.client.raw.tables.list(db_name=db_name, limit=None)]

missing = find_missing(cdf_dbs, requested_raw_tables)
if missing:
logging.warning(f"## Detected missing RAW tables: {pprint.pformat(missing)}")
for _db, _tables in missing.items():
self.client.raw.tables.create(db_name=_db, table=_tables)
self.client.raw.tables.create(db_name=_db, name=_tables)
94 changes: 54 additions & 40 deletions src/extpipes/commands/deploy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
from typing import Dict

from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineContact
from cognite.client.data_classes import (
ExtractionPipeline,
ExtractionPipelineContact,
ExtractionPipelineList,
)
from jinja2 import Template

from .base import CommandBase


def _render_template(template: str, metadata: Dict[str, str]) -> str:
def _render_template(template: str, metadata: dict[str, str]) -> str:
# Create a new Jinja2 template from the given template string
jinja_template = Template(template)

Expand All @@ -19,64 +22,75 @@ def _render_template(template: str, metadata: Dict[str, str]) -> str:
class CommandDeploy(CommandBase):
def command(self) -> None:
# get existing extpipes
existing_extpipes = {ep.external_id: ep for ep in self.client.extraction_pipelines.list(limit=None)}
existing_extpipes = self.client.extraction_pipelines.list(limit=-1)

logging.debug(f"{existing_extpipes.as_external_ids()=}")

# get requested from config
requested_extpipes = {
pipeline.external_id: ExtractionPipeline( # key # value
external_id=pipeline.external_id
if pipeline.external_id
else _render_template(self.naming_pattern, pipeline.metadata),
name=pipeline.name if pipeline.name else _render_template(self.naming_pattern, pipeline.metadata),
description=pipeline.description,
data_set_id=self.data_sets_in_scope.get(pipeline.data_set_external_id).id, # type: ignore
raw_tables=[{"dbName": _t.db_name, "tableName": _t.table_name} for _t in pipeline.raw_tables],
schedule=pipeline.schedule,
contacts=[
ExtractionPipelineContact(
name=_c.name, email=_c.email, role=_c.role, send_notification=_c.send_notification
)
for _c in [*pipeline.contacts, *self.default_contacts]
],
metadata=pipeline.metadata,
created_by=self.client._config.client_name,
)
for pipeline in self.extpipes_config.pipelines
}
requested_extpipes = ExtractionPipelineList(
[
ExtractionPipeline( # key # value
external_id=pipeline.external_id
if pipeline.external_id
else _render_template(self.naming_pattern, pipeline.metadata),
name=pipeline.name if pipeline.name else _render_template(self.naming_pattern, pipeline.metadata),
description=pipeline.description,
data_set_id=self.data_sets_in_scope.get(pipeline.data_set_external_id).id, # type: ignore
raw_tables=[{"dbName": _t.db_name, "tableName": _t.table_name} for _t in pipeline.raw_tables],
schedule=pipeline.schedule,
contacts=[
ExtractionPipelineContact(
name=_c.name, email=_c.email, role=_c.role, send_notification=_c.send_notification
)
for _c in [*pipeline.contacts, *self.default_contacts]
],
metadata=pipeline.metadata,
created_by=pipeline.created_by,
)
for pipeline in self.extpipes_config.pipelines
]
)

logging.debug(f"{requested_extpipes.as_external_ids()=}")

# Cognite SDK v6.30.1 does NOT support UPSERT (with ExtractionPipelines)
# build 3 lists create/update/delete
create_extpipes = [
extpipe
for external_id, extpipe in requested_extpipes.items()
if external_id not in existing_extpipes.keys()
]

update_extpipes = [
extpipe for external_id, extpipe in requested_extpipes.items() if external_id in existing_extpipes.keys()
]
create_extpipes = ExtractionPipelineList(
[
extpipe
for extpipe in requested_extpipes
if extpipe.external_id not in existing_extpipes.as_external_ids()
]
)

update_extpipes = ExtractionPipelineList(
[extpipe for extpipe in requested_extpipes if extpipe.external_id in existing_extpipes.as_external_ids()]
)

delete_extpipes = [
external_id for external_id in existing_extpipes.keys() if external_id not in requested_extpipes.keys()
external_id
for external_id in existing_extpipes.as_external_ids()
if external_id not in requested_extpipes.as_external_ids()
]

if create_extpipes:
logging.info(f"## Extraction pipelines to create: {[extpipe.external_id for extpipe in create_extpipes]}")
logging.info(f"Extraction pipelines to create: {create_extpipes.as_external_ids()}")
if update_extpipes:
logging.info(f"## Extraction pipelines to update: {[extpipe.external_id for extpipe in update_extpipes]}")
logging.info(f"Extraction pipelines to update: {update_extpipes.as_external_ids()}")
if self.extpipes_config.features.automatic_delete and delete_extpipes:
logging.info(f"## Extraction pipelines to delete: {[extpipe.external_id for extpipe in update_extpipes]}")
logging.info(f"Extraction pipelines to delete: {delete_extpipes}")

if self.dry_run:
logging.warning("Dry run detected. No changes to be applied to CDF.")
return

logging.info("## Applying configuration")
logging.info("Applying configuration")

self.ensure_raw_tables()

if self.extpipes_config.features.automatic_delete and delete_extpipes:
logging.info(f"## Extraction pipelines to delete: {delete_extpipes}")
self.client.extraction_pipelines.delete(external_id=delete_extpipes)
logging.info(f"Extraction Pipelines deleted: {len(delete_extpipes)}")

if create_extpipes:
res = self.client.extraction_pipelines.create(create_extpipes)
Expand Down
6 changes: 3 additions & 3 deletions src/extpipes/common/base_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple, Type
from typing import Type

from pydantic_settings import (
BaseSettings,
Expand All @@ -22,7 +22,7 @@ def to_hyphen_case(value: str) -> str:

class Model(BaseSettings):
model_config = SettingsConfigDict(
extra="ignore",
extra="forbid",
# generate for each field an alias in hyphen-case (kebap)
alias_generator=to_hyphen_case,
# an aliased field may be populated by its name as given by the model attribute, as well as the alias
Expand All @@ -38,7 +38,7 @@ def settings_customise_sources(
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
) -> tuple[PydanticBaseSettingsSource, ...]:
# here we choose to ignore env_settings or dotenv_settings
# to avoid unecpectd expansion of pydantic properties matching an envvar
# all envvar expansion exlcusivly happens in the dependency-injector
Expand Down

0 comments on commit db3705f

Please sign in to comment.