Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REFACTOR]Clean and organise data processing #405

Open
3 tasks
HAEKADI opened this issue Oct 17, 2024 · 10 comments
Open
3 tasks

[REFACTOR]Clean and organise data processing #405

HAEKADI opened this issue Oct 17, 2024 · 10 comments
Assignees
Labels

Comments

@HAEKADI
Copy link
Contributor

HAEKADI commented Oct 17, 2024

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Oct 25, 2024

The target organisation for the repository will look something like this:

dag_datalake_sirene/
│
├── config/
│   ├── __init__.py
│   ├── constants.py  # Global constants
│   └── settings.py   # Environment-specific settings
│
├── helpers/
│   ├── __init__.py
│   ├── data_processor.py
│   ├── config_models.py
│   ├── minio_helpers.py
│   ├── tchap.py
│   ├── utils.py
│   └── sqlite_client.py
│
├── workflows/
│   ├── __init__.py
│   │
│   ├── data_pipelines/
│   │   ├── __init__.py
│   │   │
│   │   ├── egapro/
│   │   │   ├── __init__.py
│   │   │   ├── dag_egapro.py
│   │   │   ├── egapro_processor.py
│   │   │   └── task_functions.py
│   │   │
│   │   ├── rge/
│   │   │   ├── __init__.py
│   │   │   ├── dag_rge.py
│   │   │   ├── rge_processor.py
│   │   │   └── task_functions.py
│   │   │
│   │   ├── colter/
│   │   │   ├── __init__.py
│   │   │   ├── dag_colter.py
│   │   │   ├── colter_processor.py
│   │   │   └── task_functions.py
│   │   │
│   │   └── ... (other data sources)
│   │
│   └── common/
│       ├── __init__.py
│       └── common_tasks.py  # Tasks that might be shared across DAGs
│
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   │
│   ├── helpers/
│   │   ├── __init__.py
│   │   ├── test_data_processor.py
│   │   └── ... (tests for other helpers)
│   │
│   └── workflows/
│       ├── __init__.py
│       │
│       └── data_pipelines/
│           ├── __init__.py
│           ├── test_egapro.py
│           ├── test_rge.py
│           └── ... (tests for other data pipelines)
│
├── requirements.txt
├── setup.py
└── README.md

@HAEKADI HAEKADI changed the title [REFACTOR]Clean data ressource processing [REFACTOR]Clean and organise data processing Oct 25, 2024
@HAEKADI
Copy link
Contributor Author

HAEKADI commented Oct 25, 2024

This refactoring would involve creating a universal client or framework that can handle different data sources with similar processing patterns. Something that would look like this:

class DataProcessor(ABC):
    def __init__(self, config):
        self.config = config
        self.minio_client = minio_client

    @abstractmethod
    def download_data(self):
        pass

    @abstractmethod
    def process_data(self):
        pass

    @abstractmethod
    def save_to_minio(self):
        pass

    @abstractmethod
    def compare_files_minio(self):
        pass

    def send_notification(self, message):
        send_message(message)):
            self.send_notification()

This will be a huge undertaking given the size of this codebase.

Here's a step-by-step plan that focuses on gradually refactoring configuration management, then using a universal client for processing data sources, without drastically changing the existing structure:

  1. Introduce data classes for configuration management
  2. Create a universal data processing client (like shown above)
  3. Refactor one data source using data classes and DataProcessor
  4. Gradually refactor other data sources (each data source gets a PR)
  5. Enhance DataProcessor as needed
  6. Update tests
  7. Documentation updates

In order to avoid bugs as much as possible, and simplify reviewing efforts, each step will preferably have its own PR.

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Oct 25, 2024

@XavierJp Any feedback so far ?

@XavierJp
Copy link
Contributor

It is... beautiful 🥹

@XavierJp
Copy link
Contributor

Honestly sounds very relevant. Step by step is always a good choice. Will start the migration with the most complicated clients or the simpler ones ?

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Oct 25, 2024

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Oct 25, 2024

I’m thinking of implementing an initial version of the client focused on a straightforward data source, such as EGAPRO (see related draft).

Then each PR will add a new data source, potentially introducing additional layers of complexity.

This is very much a work in progress. Many improvements are coming (so don't mind the naming conventions etc).

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Oct 25, 2024

@XavierJp What do you think?

@XavierJp
Copy link
Contributor

Tottally agree. You could even do one or two basic sources, then the hard ones like insee and rne. Thus ensuring the model is both straightforward and flexible enough.

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Nov 22, 2024

Potential enhancement :

  • Add a MinIOFile and an AirflowFile to distinguish types of files while processing.
  • Use Dataclass and Pydantic when possible instead of TypeDict.

HAEKADI added a commit that referenced this issue Jan 24, 2025
hacherix added a commit that referenced this issue Jan 27, 2025
Related to
#405

This PR creates the DatabaseTableProcessor class so it can be used as a generic
tool to create the SQLite tables.

AgenceBio is the first data source to use this new class.
We will refactor the other data sources in a second step if we are ok
with the implementation.

It does not work for RNE and SIRENE yet. We will tackle does later.

Current implementation design:
1- Move any transformation to the data done in `etl` back to the
relevant DAG.
2- Add a `table_ddl` to the relevant config
3- Use generic DatabaseTableProcessor methods for downloading the file from MinIO
and upload it to the SQLite database

The whole `data_fetch_clean` and `sqlite` folders should disappear as a
result.

Note about `dag.py`:
PythonOperator still in use so we can easily identify the tasks that
need to be refactored.

All tasks instances had to be renamed because with @dag the instance's
name is conflicting with the callable's name.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants