Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/kafka implementations #3760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def process_data_for_api(data: pd.DataFrame) -> list:

return restructured_data

def transform_devices(devices: List[Dict[str, Any]], task_instance) -> pd.DataFrame:
def transform_devices(devices: List[Dict[str, Any]], taskinstance) -> pd.DataFrame:
"""
Transforms and processes the devices DataFrame. If the checksum of the
devices data has not changed since the last execution, it returns an empty DataFrame.
Expand All @@ -292,6 +292,7 @@ def transform_devices(devices: List[Dict[str, Any]], task_instance) -> pd.DataFr
"""
import hashlib

devices = pd.DataFrame(devices)
devices.rename(
columns={
"device_id": "device_name",
Expand All @@ -306,11 +307,11 @@ def transform_devices(devices: List[Dict[str, Any]], task_instance) -> pd.DataFr
devices_json = devices.to_json(orient="records", date_format="iso")
api_devices_checksum = hashlib.md5(devices_json.encode()).hexdigest()

previous_checksum = task_instance.xcom_pull(key="devices_checksum")
previous_checksum = taskinstance.xcom_pull(key="devices_checksum")

if previous_checksum == api_devices_checksum:
return pd.DataFrame()

task_instance.xcom_push(key="devices_checksum", value=api_devices_checksum)
taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)

Comment on lines +310 to 316
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding error handling for XCom operations.

The XCom operations could fail silently if there are issues with the Airflow context or XCom backend. Consider adding try-except blocks to handle potential errors gracefully.

-        previous_checksum = taskinstance.xcom_pull(key="devices_checksum")
+        try:
+            previous_checksum = taskinstance.xcom_pull(key="devices_checksum")
+        except Exception as e:
+            logger.warning(f"Failed to pull previous checksum from XCom: {e}")
+            previous_checksum = None

         if previous_checksum == api_devices_checksum:
             return pd.DataFrame()

-        taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)
+        try:
+            taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)
+        except Exception as e:
+            logger.error(f"Failed to push new checksum to XCom: {e}")

Committable suggestion was skipped due to low confidence.

return devices
4 changes: 2 additions & 2 deletions src/workflows/airqo_etl_utils/message_broker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self):
# Note: This should be updated in case the number of partions used changes.
self.partition_loads = {int(p): 0 for p in self.__partitions}
self.config = {
"bootstrap.servers": self.__bootstrap_servers,
"bootstrap.servers": "35.187.179.27:30200,34.79.78.204:30200,34.78.90.92:30200",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Remove hardcoded Kafka bootstrap servers.

The bootstrap servers should not be hardcoded in the source code. This practice:

  • Makes deployment across different environments challenging
  • Exposes sensitive infrastructure information
  • Violates configuration management best practices

Replace the hardcoded servers with configuration reference:

-            "bootstrap.servers": "35.187.179.27:30200,34.79.78.204:30200,34.78.90.92:30200",
+            "bootstrap.servers": self.__bootstrap_servers,

Committable suggestion was skipped due to low confidence.

"metadata.max.age.ms": 60000,
}

Expand Down Expand Up @@ -205,7 +205,7 @@ def consume_from_topic(
max_messages: Optional[int] = None,
auto_commit: bool = True,
offset: Optional[int] = None,
wait_time_sec: int = 30,
wait_time_sec: int = 40,
streaming: bool = False,
) -> Any:
"""
Expand Down
3 changes: 1 addition & 2 deletions src/workflows/dags/airqo_kafka_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ def send_device_data_to_broker(devices: pd.DataFrame, **kwargs) -> None:
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils

data_validation = DataValidationUtils()
devices = data_validation.transform_devices(
devices = DataValidationUtils.transform_devices(
devices=devices, taskinstance=kwargs["ti"]
)
if not devices.empty:
Expand Down
12 changes: 10 additions & 2 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,16 @@ def send_hourly_measurements_to_message_broker(
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import Tenant
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour)
Comment on lines +99 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a more robust unique identifier format

While using date and hour for uniqueness is a good start, there could be edge cases where multiple DAG runs occur within the same hour. Consider enhancing the unique string generation to include minutes and a random component:

-unique_str = str(now.date()) + "-" + str(now.hour)
+unique_str = f"{now.strftime('%Y-%m-%d-%H-%M')}-{hash(now.timestamp())}"

This would provide better guarantees of uniqueness while maintaining temporal ordering.

Also applies to: 372-375


data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id,
caller=kwargs["dag"].dag_id + unique_str,
Comment on lines +99 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider extracting unique string generation to a utility function

The unique string generation logic is duplicated in both DAGs. Consider extracting this to a utility function in airqo_etl_utils to maintain DRY principles and ensure consistent implementation across all DAGs.

Example implementation:

# In airqo_etl_utils/date.py
def generate_unique_caller_id(dag_id: str) -> str:
    now = datetime.now()
    unique_str = f"{now.strftime('%Y-%m-%d-%H-%M')}-{hash(now.timestamp())}"
    return f"{dag_id}{unique_str}"

Then in the DAGs:

-now = datetime.now()
-unique_str = str(now.date()) + "-" + str(now.hour)
-caller=kwargs["dag"].dag_id + unique_str,
+caller=generate_unique_caller_id(kwargs["dag"].dag_id),

Also applies to: 372-381

)
broker = MessageBrokerUtils()
broker.publish_to_topic(
Expand Down Expand Up @@ -365,12 +369,16 @@ def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import Tenant
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour)

data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id,
caller=kwargs["dag"].dag_id + unique_str,
)
broker = MessageBrokerUtils()
broker.publish_to_topic(
Expand Down
Loading