Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update base.py #30

Merged
merged 6 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ inputs:
required: true
runs:
using: "docker"
image: docker://cognite/extpipes-cli:v3.0.0-beta1
image: docker://cognite/extpipes-cli:v3.0.0-beta2
env:
CDF_TOKEN_URL: ${{ inputs.token-url }}
CDF_PROJECT: ${{ inputs.cdf-project-name }}
Expand Down
11 changes: 1 addition & 10 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
# matching the name of the "script" hook, used from Dockerfile
name = "extpipes-cli"
version = "3.0.0-beta1"
version = "3.0.0-beta2"
description = "A CLI to deploy CDF Extraction Pipelines"
authors = ["Peter Arwanitis <[email protected]>", "Tugce Ozgur Oztetik <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -55,12 +55,3 @@ extpipes-cli = "extpipes.__main__:main"
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"

# 231005 pa: deact semver
# [tool.semantic_release]
# version_toml = [
# "pyproject.toml:tool.poetry.version"
# ]
version_variable = [
"src/extpipes/__init__.py:__version__",
]
2 changes: 1 addition & 1 deletion src/extpipes/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.0.0-beta1"
__version__ = "3.0.0-beta2"
4 changes: 2 additions & 2 deletions src/extpipes/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# ## dependencies are validated
# * `schedule` only supports: `On trigger | Continuous | <cron expression> | null`

from typing import Dict, Optional
from typing import Optional

import click
from click import Context
Expand Down Expand Up @@ -168,7 +168,7 @@ def extpipes_cli(
help="Delete extpipes which are not specified in config-file",
)
@click.pass_obj
def deploy(obj: Dict, config_file: str, automatic_delete: bool = True) -> None:
def deploy(obj: dict, config_file: str, automatic_delete: bool = True) -> None:
click.echo(click.style("Deploying Extraction Pipelines...", fg="green"))

try:
Expand Down
12 changes: 6 additions & 6 deletions src/extpipes/app_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from enum import ReprEnum # new in 3.11
from typing import Annotated, Dict, Optional, Set
from typing import Annotated, Optional

from jinja2 import Environment, meta
from pydantic import Field, StringConstraints, field_validator, model_validator
Expand Down Expand Up @@ -45,15 +45,15 @@ class Pipeline(Model):
schedule: Annotated[str, StringConstraints(pattern=CRON_OR_FIXED_PATTERN)]
contacts: list[Contact] = Field(default=list())
source: Optional[str] = Field(default=None)
metadata: Dict[str, str] = Field(default=dict())
metadata: dict[str, str] = Field(default=dict())
documentation: Optional[str] = Field(default=None)
created_by: Optional[str] = Field(default=None)
created_by: Optional[str] = Field(default=f"dataops - extpipes-cli@v{__version__}")
raw_tables: list[RawTable] = Field(default=list())
extpipe_config: Optional[Dict[str, str]] = Field(default=None)
extpipe_config: Optional[dict[str, str]] = Field(default=None)

@field_validator("metadata")
@classmethod
def ensure_metadata_to_have_version(cls, v: Dict[str, str]) -> Dict[str, str]:
def ensure_metadata_to_have_version(cls, v: dict[str, str]) -> dict[str, str]:
if "Dataops_created" not in v:
v["Dataops_created"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if "Dataops_source" not in v:
Expand Down Expand Up @@ -108,7 +108,7 @@ def check_pattern_condition(self) -> "ExtpipesConfig":
logging.error(f"## Misconfigured pipelines name: {misconfigured}")
raise ValueError("With pattern provider, pipelines should not have names defined.")

def extract_jinja_variables(template: str) -> Set[str]:
def extract_jinja_variables(template: str) -> set[str]:
env = Environment()
parsed_content = env.parse(template)
# Extract all variables from the parsed content
Expand Down
4 changes: 2 additions & 2 deletions src/extpipes/app_container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging.config
import os
from pathlib import Path
from typing import Dict, Optional, Type
from typing import Optional, Type

from dependency_injector import containers, providers
from dotenv import load_dotenv
Expand Down Expand Up @@ -44,7 +44,7 @@ def init_container(
return container


def init_logging(logging_config: Optional[Dict], deprecated_logger_config: Optional[Dict]):
def init_logging(logging_config: Optional[dict], deprecated_logger_config: Optional[dict]):
# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
# from logging-cookbook examples for 'logging_config' dict
# TODO: needed to handle missing log folders?
Expand Down
11 changes: 5 additions & 6 deletions src/extpipes/commands/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import pprint
from pathlib import Path
from typing import Dict, Self
from typing import Self

from cognite.client import CogniteClient
from cognite.client.exceptions import CogniteNotFoundError
Expand All @@ -21,7 +21,6 @@ def __init__(
dry_run: bool,
dotenv_path: str | Path | None = None,
):

# validate and load config according to command-mode
ContainerCls = ContainerSelector[command]
self.container = init_container(ContainerCls, config_path=config_path, dotenv_path=dotenv_path)
Expand Down Expand Up @@ -78,7 +77,7 @@ def validate_config(self) -> Self:

def ensure_raw_tables(self):
# RAW
def find_missing(existing: Dict, target: Dict) -> Dict:
def find_missing(existing: dict, target: dict) -> dict:
missing = {}

for key, value_list in target.items():
Expand All @@ -94,20 +93,20 @@ def find_missing(existing: Dict, target: Dict) -> Dict:
return missing

# build dictionary of configured dbs:tables
requested_raw_tables: Dict = {}
requested_raw_tables: dict = {}
for pipeline in self.extpipes_config.pipelines:
for raw_table in pipeline.raw_tables:
if raw_table.db_name in requested_raw_tables:
requested_raw_tables[raw_table.db_name].append(raw_table.table_name)
else:
requested_raw_tables[raw_table.db_name] = [raw_table.table_name]
# get existing dbs/tables into a dict for fast lookup
cdf_dbs: Dict = {}
cdf_dbs: dict = {}
for db_name in {_db.name for _db in self.client.raw.databases.list(limit=None)}:
cdf_dbs[db_name] = [table.name for table in self.client.raw.tables.list(db_name=db_name, limit=None)]

missing = find_missing(cdf_dbs, requested_raw_tables)
if missing:
logging.warning(f"## Detected missing RAW tables: {pprint.pformat(missing)}")
for _db, _tables in missing.items():
self.client.raw.tables.create(db_name=_db, table=_tables)
self.client.raw.tables.create(db_name=_db, name=_tables)
94 changes: 54 additions & 40 deletions src/extpipes/commands/deploy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
from typing import Dict

from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineContact
from cognite.client.data_classes import (
ExtractionPipeline,
ExtractionPipelineContact,
ExtractionPipelineList,
)
from jinja2 import Template

from .base import CommandBase


def _render_template(template: str, metadata: Dict[str, str]) -> str:
def _render_template(template: str, metadata: dict[str, str]) -> str:
# Create a new Jinja2 template from the given template string
jinja_template = Template(template)

Expand All @@ -19,64 +22,75 @@ def _render_template(template: str, metadata: Dict[str, str]) -> str:
class CommandDeploy(CommandBase):
def command(self) -> None:
# get existing extpipes
existing_extpipes = {ep.external_id: ep for ep in self.client.extraction_pipelines.list(limit=None)}
existing_extpipes = self.client.extraction_pipelines.list(limit=-1)

logging.debug(f"{existing_extpipes.as_external_ids()=}")

# get requested from config
requested_extpipes = {
pipeline.external_id: ExtractionPipeline( # key # value
external_id=pipeline.external_id
if pipeline.external_id
else _render_template(self.naming_pattern, pipeline.metadata),
name=pipeline.name if pipeline.name else _render_template(self.naming_pattern, pipeline.metadata),
description=pipeline.description,
data_set_id=self.data_sets_in_scope.get(pipeline.data_set_external_id).id, # type: ignore
raw_tables=[{"dbName": _t.db_name, "tableName": _t.table_name} for _t in pipeline.raw_tables],
schedule=pipeline.schedule,
contacts=[
ExtractionPipelineContact(
name=_c.name, email=_c.email, role=_c.role, send_notification=_c.send_notification
)
for _c in [*pipeline.contacts, *self.default_contacts]
],
metadata=pipeline.metadata,
created_by=self.client._config.client_name,
)
for pipeline in self.extpipes_config.pipelines
}
requested_extpipes = ExtractionPipelineList(
[
ExtractionPipeline( # key # value
external_id=pipeline.external_id
if pipeline.external_id
else _render_template(self.naming_pattern, pipeline.metadata),
name=pipeline.name if pipeline.name else _render_template(self.naming_pattern, pipeline.metadata),
description=pipeline.description,
data_set_id=self.data_sets_in_scope.get(pipeline.data_set_external_id).id, # type: ignore
raw_tables=[{"dbName": _t.db_name, "tableName": _t.table_name} for _t in pipeline.raw_tables],
schedule=pipeline.schedule,
contacts=[
ExtractionPipelineContact(
name=_c.name, email=_c.email, role=_c.role, send_notification=_c.send_notification
)
for _c in [*pipeline.contacts, *self.default_contacts]
],
metadata=pipeline.metadata,
created_by=pipeline.created_by,
)
for pipeline in self.extpipes_config.pipelines
]
)

logging.debug(f"{requested_extpipes.as_external_ids()=}")

# Cognite SDK v6.30.1 does NOT support UPSERT (with ExtractionPipelines)
# build 3 lists create/update/delete
create_extpipes = [
extpipe
for external_id, extpipe in requested_extpipes.items()
if external_id not in existing_extpipes.keys()
]

update_extpipes = [
extpipe for external_id, extpipe in requested_extpipes.items() if external_id in existing_extpipes.keys()
]
create_extpipes = ExtractionPipelineList(
[
extpipe
for extpipe in requested_extpipes
if extpipe.external_id not in existing_extpipes.as_external_ids()
]
)

update_extpipes = ExtractionPipelineList(
[extpipe for extpipe in requested_extpipes if extpipe.external_id in existing_extpipes.as_external_ids()]
)

delete_extpipes = [
external_id for external_id in existing_extpipes.keys() if external_id not in requested_extpipes.keys()
external_id
for external_id in existing_extpipes.as_external_ids()
if external_id not in requested_extpipes.as_external_ids()
]

if create_extpipes:
logging.info(f"## Extraction pipelines to create: {[extpipe.external_id for extpipe in create_extpipes]}")
logging.info(f"Extraction pipelines to create: {create_extpipes.as_external_ids()}")
if update_extpipes:
logging.info(f"## Extraction pipelines to update: {[extpipe.external_id for extpipe in update_extpipes]}")
logging.info(f"Extraction pipelines to update: {update_extpipes.as_external_ids()}")
if self.extpipes_config.features.automatic_delete and delete_extpipes:
logging.info(f"## Extraction pipelines to delete: {[extpipe.external_id for extpipe in update_extpipes]}")
logging.info(f"Extraction pipelines to delete: {delete_extpipes}")

if self.dry_run:
logging.warning("Dry run detected. No changes to be applied to CDF.")
return

logging.info("## Applying configuration")
logging.info("Applying configuration")

self.ensure_raw_tables()

if self.extpipes_config.features.automatic_delete and delete_extpipes:
logging.info(f"## Extraction pipelines to delete: {delete_extpipes}")
self.client.extraction_pipelines.delete(external_id=delete_extpipes)
logging.info(f"Extraction Pipelines deleted: {len(delete_extpipes)}")

if create_extpipes:
res = self.client.extraction_pipelines.create(create_extpipes)
Expand Down
6 changes: 3 additions & 3 deletions src/extpipes/common/base_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple, Type
from typing import Type

from pydantic_settings import (
BaseSettings,
Expand All @@ -22,7 +22,7 @@ def to_hyphen_case(value: str) -> str:

class Model(BaseSettings):
model_config = SettingsConfigDict(
extra="ignore",
extra="forbid",
# generate for each field an alias in hyphen-case (kebap)
alias_generator=to_hyphen_case,
# an aliased field may be populated by its name as given by the model attribute, as well as the alias
Expand All @@ -38,7 +38,7 @@ def settings_customise_sources(
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
) -> tuple[PydanticBaseSettingsSource, ...]:
# here we choose to ignore env_settings or dotenv_settings
# to avoid unecpectd expansion of pydantic properties matching an envvar
# all envvar expansion exlcusivly happens in the dependency-injector
Expand Down