diff --git a/action.yml b/action.yml index 7a38c5d..3b339ed 100644 --- a/action.yml +++ b/action.yml @@ -31,7 +31,7 @@ inputs: required: true runs: using: "docker" - image: docker://cognite/extpipes-cli:v3.0.0-beta1 + image: docker://cognite/extpipes-cli:v3.0.0-beta2 env: CDF_TOKEN_URL: ${{ inputs.token-url }} CDF_PROJECT: ${{ inputs.cdf-project-name }} diff --git a/pyproject.toml b/pyproject.toml index 0299e48..a5708af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] # matching the name of the "script" hook, used from Dockerfile name = "extpipes-cli" -version = "3.0.0-beta1" +version = "3.0.0-beta2" description = "A CLI to deploy CDF Extraction Pipelines" authors = ["Peter Arwanitis ", "Tugce Ozgur Oztetik "] license = "Apache-2.0" @@ -55,12 +55,3 @@ extpipes-cli = "extpipes.__main__:main" [build-system] requires = ["poetry>=0.12"] build-backend = "poetry.masonry.api" - -# 231005 pa: deact semver -# [tool.semantic_release] -# version_toml = [ -# "pyproject.toml:tool.poetry.version" -# ] -version_variable = [ - "src/extpipes/__init__.py:__version__", -] diff --git a/src/extpipes/__init__.py b/src/extpipes/__init__.py index 39f9f42..d956949 100644 --- a/src/extpipes/__init__.py +++ b/src/extpipes/__init__.py @@ -1 +1 @@ -__version__ = "3.0.0-beta1" +__version__ = "3.0.0-beta2" diff --git a/src/extpipes/__main__.py b/src/extpipes/__main__.py index c8901b2..f1d2f1a 100644 --- a/src/extpipes/__main__.py +++ b/src/extpipes/__main__.py @@ -27,7 +27,7 @@ # ## dependencies are validated # * `schedule` only supports: `On trigger | Continuous | | null` -from typing import Dict, Optional +from typing import Optional import click from click import Context @@ -168,7 +168,7 @@ def extpipes_cli( help="Delete extpipes which are not specified in config-file", ) @click.pass_obj -def deploy(obj: Dict, config_file: str, automatic_delete: bool = True) -> None: +def deploy(obj: dict, config_file: str, automatic_delete: bool = True) -> None: click.echo(click.style("Deploying Extraction Pipelines...", fg="green")) try: diff --git a/src/extpipes/app_config.py b/src/extpipes/app_config.py index 73350d0..9622b30 100644 --- a/src/extpipes/app_config.py +++ b/src/extpipes/app_config.py @@ -1,7 +1,7 @@ import logging from datetime import datetime from enum import ReprEnum # new in 3.11 -from typing import Annotated, Dict, Optional, Set +from typing import Annotated, Optional from jinja2 import Environment, meta from pydantic import Field, StringConstraints, field_validator, model_validator @@ -45,15 +45,15 @@ class Pipeline(Model): schedule: Annotated[str, StringConstraints(pattern=CRON_OR_FIXED_PATTERN)] contacts: list[Contact] = Field(default=list()) source: Optional[str] = Field(default=None) - metadata: Dict[str, str] = Field(default=dict()) + metadata: dict[str, str] = Field(default=dict()) documentation: Optional[str] = Field(default=None) - created_by: Optional[str] = Field(default=None) + created_by: Optional[str] = Field(default=f"dataops - extpipes-cli@v{__version__}") raw_tables: list[RawTable] = Field(default=list()) - extpipe_config: Optional[Dict[str, str]] = Field(default=None) + extpipe_config: Optional[dict[str, str]] = Field(default=None) @field_validator("metadata") @classmethod - def ensure_metadata_to_have_version(cls, v: Dict[str, str]) -> Dict[str, str]: + def ensure_metadata_to_have_version(cls, v: dict[str, str]) -> dict[str, str]: if "Dataops_created" not in v: v["Dataops_created"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if "Dataops_source" not in v: @@ -108,7 +108,7 @@ def check_pattern_condition(self) -> "ExtpipesConfig": logging.error(f"## Misconfigured pipelines name: {misconfigured}") raise ValueError("With pattern provider, pipelines should not have names defined.") - def extract_jinja_variables(template: str) -> Set[str]: + def extract_jinja_variables(template: str) -> set[str]: env = Environment() parsed_content = env.parse(template) # Extract all variables from the parsed content diff --git a/src/extpipes/app_container.py b/src/extpipes/app_container.py index 1aa02b9..2c5c1f4 100644 --- a/src/extpipes/app_container.py +++ b/src/extpipes/app_container.py @@ -1,7 +1,7 @@ import logging.config import os from pathlib import Path -from typing import Dict, Optional, Type +from typing import Optional, Type from dependency_injector import containers, providers from dotenv import load_dotenv @@ -44,7 +44,7 @@ def init_container( return container -def init_logging(logging_config: Optional[Dict], deprecated_logger_config: Optional[Dict]): +def init_logging(logging_config: Optional[dict], deprecated_logger_config: Optional[dict]): # https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes # from logging-cookbook examples for 'logging_config' dict # TODO: needed to handle missing log folders? diff --git a/src/extpipes/commands/base.py b/src/extpipes/commands/base.py index c30ad16..e0a925f 100644 --- a/src/extpipes/commands/base.py +++ b/src/extpipes/commands/base.py @@ -1,7 +1,7 @@ import logging import pprint from pathlib import Path -from typing import Dict, Self +from typing import Self from cognite.client import CogniteClient from cognite.client.exceptions import CogniteNotFoundError @@ -21,7 +21,6 @@ def __init__( dry_run: bool, dotenv_path: str | Path | None = None, ): - # validate and load config according to command-mode ContainerCls = ContainerSelector[command] self.container = init_container(ContainerCls, config_path=config_path, dotenv_path=dotenv_path) @@ -78,7 +77,7 @@ def validate_config(self) -> Self: def ensure_raw_tables(self): # RAW - def find_missing(existing: Dict, target: Dict) -> Dict: + def find_missing(existing: dict, target: dict) -> dict: missing = {} for key, value_list in target.items(): @@ -94,7 +93,7 @@ def find_missing(existing: Dict, target: Dict) -> Dict: return missing # build dictionary of configured dbs:tables - requested_raw_tables: Dict = {} + requested_raw_tables: dict = {} for pipeline in self.extpipes_config.pipelines: for raw_table in pipeline.raw_tables: if raw_table.db_name in requested_raw_tables: @@ -102,7 +101,7 @@ def find_missing(existing: Dict, target: Dict) -> Dict: else: requested_raw_tables[raw_table.db_name] = [raw_table.table_name] # get existing dbs/tables into a dict for fast lookup - cdf_dbs: Dict = {} + cdf_dbs: dict = {} for db_name in {_db.name for _db in self.client.raw.databases.list(limit=None)}: cdf_dbs[db_name] = [table.name for table in self.client.raw.tables.list(db_name=db_name, limit=None)] @@ -110,4 +109,4 @@ def find_missing(existing: Dict, target: Dict) -> Dict: if missing: logging.warning(f"## Detected missing RAW tables: {pprint.pformat(missing)}") for _db, _tables in missing.items(): - self.client.raw.tables.create(db_name=_db, table=_tables) + self.client.raw.tables.create(db_name=_db, name=_tables) diff --git a/src/extpipes/commands/deploy.py b/src/extpipes/commands/deploy.py index 2960a0e..5d28595 100644 --- a/src/extpipes/commands/deploy.py +++ b/src/extpipes/commands/deploy.py @@ -1,13 +1,16 @@ import logging -from typing import Dict -from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineContact +from cognite.client.data_classes import ( + ExtractionPipeline, + ExtractionPipelineContact, + ExtractionPipelineList, +) from jinja2 import Template from .base import CommandBase -def _render_template(template: str, metadata: Dict[str, str]) -> str: +def _render_template(template: str, metadata: dict[str, str]) -> str: # Create a new Jinja2 template from the given template string jinja_template = Template(template) @@ -19,64 +22,75 @@ def _render_template(template: str, metadata: Dict[str, str]) -> str: class CommandDeploy(CommandBase): def command(self) -> None: # get existing extpipes - existing_extpipes = {ep.external_id: ep for ep in self.client.extraction_pipelines.list(limit=None)} + existing_extpipes = self.client.extraction_pipelines.list(limit=-1) + + logging.debug(f"{existing_extpipes.as_external_ids()=}") # get requested from config - requested_extpipes = { - pipeline.external_id: ExtractionPipeline( # key # value - external_id=pipeline.external_id - if pipeline.external_id - else _render_template(self.naming_pattern, pipeline.metadata), - name=pipeline.name if pipeline.name else _render_template(self.naming_pattern, pipeline.metadata), - description=pipeline.description, - data_set_id=self.data_sets_in_scope.get(pipeline.data_set_external_id).id, # type: ignore - raw_tables=[{"dbName": _t.db_name, "tableName": _t.table_name} for _t in pipeline.raw_tables], - schedule=pipeline.schedule, - contacts=[ - ExtractionPipelineContact( - name=_c.name, email=_c.email, role=_c.role, send_notification=_c.send_notification - ) - for _c in [*pipeline.contacts, *self.default_contacts] - ], - metadata=pipeline.metadata, - created_by=self.client._config.client_name, - ) - for pipeline in self.extpipes_config.pipelines - } + requested_extpipes = ExtractionPipelineList( + [ + ExtractionPipeline( # key # value + external_id=pipeline.external_id + if pipeline.external_id + else _render_template(self.naming_pattern, pipeline.metadata), + name=pipeline.name if pipeline.name else _render_template(self.naming_pattern, pipeline.metadata), + description=pipeline.description, + data_set_id=self.data_sets_in_scope.get(pipeline.data_set_external_id).id, # type: ignore + raw_tables=[{"dbName": _t.db_name, "tableName": _t.table_name} for _t in pipeline.raw_tables], + schedule=pipeline.schedule, + contacts=[ + ExtractionPipelineContact( + name=_c.name, email=_c.email, role=_c.role, send_notification=_c.send_notification + ) + for _c in [*pipeline.contacts, *self.default_contacts] + ], + metadata=pipeline.metadata, + created_by=pipeline.created_by, + ) + for pipeline in self.extpipes_config.pipelines + ] + ) + + logging.debug(f"{requested_extpipes.as_external_ids()=}") + # Cognite SDK v6.30.1 does NOT support UPSERT (with ExtractionPipelines) # build 3 lists create/update/delete - create_extpipes = [ - extpipe - for external_id, extpipe in requested_extpipes.items() - if external_id not in existing_extpipes.keys() - ] - - update_extpipes = [ - extpipe for external_id, extpipe in requested_extpipes.items() if external_id in existing_extpipes.keys() - ] + create_extpipes = ExtractionPipelineList( + [ + extpipe + for extpipe in requested_extpipes + if extpipe.external_id not in existing_extpipes.as_external_ids() + ] + ) + + update_extpipes = ExtractionPipelineList( + [extpipe for extpipe in requested_extpipes if extpipe.external_id in existing_extpipes.as_external_ids()] + ) delete_extpipes = [ - external_id for external_id in existing_extpipes.keys() if external_id not in requested_extpipes.keys() + external_id + for external_id in existing_extpipes.as_external_ids() + if external_id not in requested_extpipes.as_external_ids() ] if create_extpipes: - logging.info(f"## Extraction pipelines to create: {[extpipe.external_id for extpipe in create_extpipes]}") + logging.info(f"Extraction pipelines to create: {create_extpipes.as_external_ids()}") if update_extpipes: - logging.info(f"## Extraction pipelines to update: {[extpipe.external_id for extpipe in update_extpipes]}") + logging.info(f"Extraction pipelines to update: {update_extpipes.as_external_ids()}") if self.extpipes_config.features.automatic_delete and delete_extpipes: - logging.info(f"## Extraction pipelines to delete: {[extpipe.external_id for extpipe in update_extpipes]}") + logging.info(f"Extraction pipelines to delete: {delete_extpipes}") if self.dry_run: logging.warning("Dry run detected. No changes to be applied to CDF.") return - logging.info("## Applying configuration") + logging.info("Applying configuration") self.ensure_raw_tables() if self.extpipes_config.features.automatic_delete and delete_extpipes: - logging.info(f"## Extraction pipelines to delete: {delete_extpipes}") self.client.extraction_pipelines.delete(external_id=delete_extpipes) + logging.info(f"Extraction Pipelines deleted: {len(delete_extpipes)}") if create_extpipes: res = self.client.extraction_pipelines.create(create_extpipes) diff --git a/src/extpipes/common/base_model.py b/src/extpipes/common/base_model.py index 81085e6..078ff75 100644 --- a/src/extpipes/common/base_model.py +++ b/src/extpipes/common/base_model.py @@ -1,4 +1,4 @@ -from typing import Tuple, Type +from typing import Type from pydantic_settings import ( BaseSettings, @@ -22,7 +22,7 @@ def to_hyphen_case(value: str) -> str: class Model(BaseSettings): model_config = SettingsConfigDict( - extra="ignore", + extra="forbid", # generate for each field an alias in hyphen-case (kebap) alias_generator=to_hyphen_case, # an aliased field may be populated by its name as given by the model attribute, as well as the alias @@ -38,7 +38,7 @@ def settings_customise_sources( env_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource, - ) -> Tuple[PydanticBaseSettingsSource, ...]: + ) -> tuple[PydanticBaseSettingsSource, ...]: # here we choose to ignore env_settings or dotenv_settings # to avoid unecpectd expansion of pydantic properties matching an envvar # all envvar expansion exlcusivly happens in the dependency-injector