Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/kafka implementations #3734

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,12 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
del device_group["site_id"]
del device_group["device_id"]
del device_group["device_number"]

averages = device_group.resample("1H", on="timestamp").mean()
try:
averages = device_group.resample("1H", on="timestamp").mean()
except ValueError as value_error:
print(f"Error: {value_error}")
print(device_group)
continue
averages["timestamp"] = averages.index
averages["device_id"] = device_id
averages["site_id"] = site_id
Expand Down Expand Up @@ -1028,3 +1032,37 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"city",
]
)

@staticmethod
def get_devices(group_id: str) -> pd.DataFrame:
"""
Fetches the list of devices from the 'devices-topic' Kafka topic and returns them as a DataFrame.

Args:
group_id (str): The group ID used for consuming devices from the topic.

Returns:
pd.DataFrame: A DataFrame containing the list of devices.
"""
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils

broker = MessageBrokerUtils()
devices_list: list = []

for device in broker.consume_from_topic(
topic="devices-topic",
group_id=group_id,
auto_offset_reset="earliest",
from_beginning=True,
):
devices_list.append(device)

devices = pd.DataFrame(devices_list)
# Will be removed in the future. Just here for initial tests.
devices.drop(
devices.columns[devices.columns.str.contains("^Unnamed")],
axis=1,
inplace=True,
)

return devices
54 changes: 52 additions & 2 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,63 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:

@staticmethod
def process_for_message_broker(
data: pd.DataFrame, tenant: Tenant, frequency: Frequency = Frequency.HOURLY
data: pd.DataFrame,
tenant: Tenant,
frequency: Frequency = Frequency.HOURLY,
topic: str = None,
caller: str = None,
) -> pd.DataFrame:
"""
Processes the input DataFrame for message broker consumption based on the specified tenant, frequency, and topic.

Args:
data (pd.DataFrame): The input data to be processed.
tenant (Tenant): The tenant filter for the data, defaults to Tenant.ALL.
frequency (Frequency): The data frequency (e.g., hourly), defaults to Frequency.HOURLY.
topic (str, optional): The Kafka topic being processed, defaults to None.
caller (str, optional): The group ID or identifier for devices processing, defaults to None.

Returns:
pd.DataFrame: The processed DataFrame ready for message broker consumption.
"""
print(caller)
from .airqo_utils import AirQoDataUtils

data.loc[:, "frequency"] = str(frequency)
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].apply(date_to_str)
data["timestamp"] = data["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

if tenant != Tenant.ALL:
data.loc[:, "tenant"] = str(tenant)

# Additional processing for hourly measurements topic
if topic == configuration.HOURLY_MEASUREMENTS_TOPIC:
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["tenant", "device_id", "site_id", "latitude", "longitude"]
]
devices.rename(
columns={
"device_id": "device_name",
"mongo_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "tenant"],
how="left",
)

data.rename(columns={"tenant": "network"}, inplace=True)
data["tenant"] = str(Tenant.AIRQO)

return data

@staticmethod
Expand Down
191 changes: 7 additions & 184 deletions src/workflows/airqo_etl_utils/message_broker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,199 +31,19 @@ def __init__(self):
"""
self.__partitions = configuration.TOPIC_PARTITIONS
self.__bootstrap_servers = configuration.BOOTSTRAP_SERVERS
# Initialize partition loads
# Initialize partition loads. Should only be used if you know the partions available.
# Note: This should be updated in case the number of partions used changes.
self.partition_loads = {int(p): 0 for p in self.__partitions}
self.config = {
"bootstrap.servers": self.__bootstrap_servers,
"request.timeout.ms": 300000,
"metadata.max.age.ms": 60000,
}

def __get_partition(self, current_partition) -> int:
"""Get next partition to load data into -- roundrobin"""
current_partition = current_partition + 1
if current_partition in self.__partitions:
return current_partition
return self.__partitions[0]

def __get_least_loaded_partition(self) -> int:
"""Select the least loaded partition."""
return min(self.partition_loads, key=self.partition_loads.get)

@classmethod
def __on_success(cls, record_metadata):
logger.info(
f"Successfully sent message: Topic-{record_metadata.topic}, Partition-{record_metadata.partition}, Offset-{record_metadata.offset}"
)

@classmethod
def __on_error(cls, record_metadata):
logger.exception(
f"Failed to send message: Topic-{record_metadata.topic}, Partition-{record_metadata.partition}, Offset-{record_metadata.offset}"
)

def __send_data(self, topic: str, data: pd.DataFrame, partition: int = None):
"""
Send data in chunks to a specified Kafka topic.

Args:
topic: Kafka topic to send the data to.
data: Pandas DataFrame containing the data to send.
partition: Optional partition to which the message should be sent.
"""
data.to_csv("message_broker_data.csv", index=False)
producer = Producer(
bootstrap_servers=self.__bootstrap_servers,
api_version_auto_timeout_ms=300000,
retries=5,
request_timeout_ms=300000,
)

print("Dataframe info : ")
print(data.info())
print("Dataframe description : ")
print(data.describe())

chunks = int(len(data) / 50)
chunks = chunks if chunks > 0 else 1
dataframes = np.array_split(data, chunks)
current_partition = -1
for dataframe in dataframes:
dataframe = pd.DataFrame(dataframe).replace(np.nan, None)
message = {"data": dataframe.to_dict("records")}

current_partition = (
partition
if partition or partition == 0
else self.__get_partition(current_partition=current_partition)
)
kafka_message = json.dumps(message, allow_nan=True).encode("utf-8")

producer.send(
topic=topic,
value=kafka_message,
partition=current_partition,
).add_callback(self.__on_success).add_errback(self.__on_error)

@staticmethod
def update_hourly_data_topic(data: pd.DataFrame):
"""
Update the hourly data topic with additional device metadata,
including device names, latitude, longitude, and tenant information.

Args:
data(pandas.DataFrame): The Pandas DataFrame to update with device metadata.
"""
devices = AirQoApi().get_devices(tenant=Tenant.ALL)
devices = pd.DataFrame(devices)
devices = devices[
[
"tenant",
"device_id",
"device_number",
"site_id",
"latitude",
"longitude",
]
]
devices.rename(
columns={
"device_id": "device_name",
"_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data.rename(
columns={
"device_id": "device_name",
},
inplace=True,
)

data.drop(columns=["device_number"], inplace=True, errors="ignore")

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "tenant"],
how="left",
)
data.rename(
columns={
"tenant": "network",
},
inplace=True,
)
data["tenant"] = str(Tenant.AIRQO)
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].apply(date_to_str)

MessageBrokerUtils().publish_to_topic(
configuration.HOURLY_MEASUREMENTS_TOPIC, data
)

@staticmethod
def update_hourly_data_topic_(data: pd.DataFrame) -> None:
"""
Update the hourly data topic with additional device metadata,
including device names, latitude, longitude, and tenant information.

Args:
data (pandas.DataFrame): The Pandas DataFrame to update with device metadata.
"""
# Fetch and prepare device data
devices = AirQoApi().get_devices(tenant=Tenant.ALL)
devices = pd.DataFrame(devices)
devices = devices[
[
"tenant",
"device_id",
"device_number",
"site_id",
"latitude",
"longitude",
]
]

devices.rename(
columns={
"device_id": "device_name",
"_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data.rename(
columns={
"device_id": "device_name",
},
inplace=True,
)

data.drop(columns=["device_number"], inplace=True, errors="ignore")

data = pd.concat(
[
data.set_index(["device_name", "site_id", "tenant"]),
devices.set_index(["device_name", "site_id", "tenant"]),
],
axis=1,
).reset_index()

data.rename(columns={"tenant": "network"}, inplace=True)
data["tenant"] = str(Tenant.AIRQO)
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].apply(date_to_str)

MessageBrokerUtils().publish_to_topic(
configuration.HOURLY_MEASUREMENTS_TOPIC, data
)

def __on_delivery(self, err, msg):
"""
Delivery callback for Kafka message send operations.
Expand Down Expand Up @@ -326,9 +146,11 @@ def publish_to_topic(
{
"retries": 5,
"batch.num.messages": 1000,
"retry.backoff.ms": 80000,
"batch.size": 1 * 1024 * 1024,
"retry.backoff.ms": 1000,
"debug": "msg",
"message.timeout.ms": 300000,
"message.max.bytes": 2 * 1024 * 1024,
}
)
producer = Producer(producer_config)
Expand All @@ -340,7 +162,7 @@ def publish_to_topic(
if column_key:
logger.info(f"Using '{column_key}' as the key for messages")
for row in dataframe_list:
key = row.pop(column_key, None)
key = row.get(column_key, None)
if key is None:
logger.warning(
f"No key found for column '{column_key}' in row: {row}"
Expand Down Expand Up @@ -409,6 +231,7 @@ def consume_from_topic(
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
"enable.auto.commit": "true" if auto_commit else "false",
"fetch.message.max.bytes": 2 * 1024 * 1024,
}
)

Expand Down
Loading
Loading