Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REFACTOR][ESS-FRANCE]Introduce a data processor for ESS France #431

Merged
merged 10 commits into from
Nov 29, 2024

Conversation

HAEKADI
Copy link
Contributor

@HAEKADI HAEKADI commented Nov 28, 2024

related to #405

This pull request introduces the implementation of the EssFranceProcessor class, configuration settings, and the update of processing DAG.

It also introduces the use of a private method in DataProcessor to push to XCom the unique count of any column in dataset.

helpers/data_processor.py Outdated Show resolved Hide resolved
@HAEKADI
Copy link
Contributor Author

HAEKADI commented Nov 29, 2024

@hacherix I am also wondering if we should integrate a utility method in DataProcessor as well, something like:

 @staticmethod
    def pull_from_xcom(key, task_ids):
        ti = get_current_context()["ti"]
        return ti.xcom_pull(key=key, task_ids=task_ids)

or have a utility class for Airflow or if it's a bit overkill. Because the the Notification push and pull is kind of heavy here :

def send_file_to_minio(self):
        super().send_file_to_minio()
        ti = get_current_context()["ti"]
        nb_siren = ti.xcom_pull(key="nb_siren_egapro", task_ids="preprocess_egapro")
        message = f"{nb_siren} unités légales."
        ti.xcom_push(key=Notification.notification_xcom_key, value=message)

What do you think ?

@hacherix
Copy link
Contributor

What do you think ?

Good idea! I like the second option best.
Having a function that takes the same number of arguments than xcom_pull/xcom_push seems overkill indeed 😄
But having functions that abstract the key argument would be very cool indeed!
Such as?

def add_notification_message(self, message)
    ti = get_current_context()["ti"]
    ti.xcom_push(key=self.notification_xcom_key, value=message)

@HAEKADI
Copy link
Contributor Author

HAEKADI commented Nov 29, 2024

What do you think ?

Good idea! I like the second option best. Having a function that takes the same number of arguments than xcom_pull/xcom_push seems overkill indeed 😄 But having functions that abstract the key argument would be very cool indeed! Such as?

def add_notification_message(self, message)
    ti = get_current_context()["ti"]
    ti.xcom_push(key=self.notification_xcom_key, value=message)

Yeah ! It's a bit more complicated since the message depends on an xcom_pull so we'll end up getting the context twice (when pulling a count of a column and in the helper add_notification_message) instead of once.

I think we need to think this through a bit more.

I will merge as is for now so we can move forward on the refactoring.

@HAEKADI HAEKADI merged commit 8e97697 into main Nov 29, 2024
4 checks passed
@HAEKADI HAEKADI deleted the refacto-ess branch November 29, 2024 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants