Skip to content

Commit

Permalink
Merge pull request #3734 from NicholasTurner23/update/Kafka-implement…
Browse files Browse the repository at this point in the history
…ations

Update/kafka implementations
  • Loading branch information
Baalmart authored Oct 23, 2024
2 parents b2d13eb + e9aeeab commit e9c68a1
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 197 deletions.
42 changes: 40 additions & 2 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,12 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
del device_group["site_id"]
del device_group["device_id"]
del device_group["device_number"]

averages = device_group.resample("1H", on="timestamp").mean()
try:
averages = device_group.resample("1H", on="timestamp").mean()
except ValueError as value_error:
print(f"Error: {value_error}")
print(device_group)
continue
averages["timestamp"] = averages.index
averages["device_id"] = device_id
averages["site_id"] = site_id
Expand Down Expand Up @@ -1028,3 +1032,37 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"city",
]
)

@staticmethod
def get_devices(group_id: str) -> pd.DataFrame:
"""
Fetches the list of devices from the 'devices-topic' Kafka topic and returns them as a DataFrame.
Args:
group_id (str): The group ID used for consuming devices from the topic.
Returns:
pd.DataFrame: A DataFrame containing the list of devices.
"""
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils

broker = MessageBrokerUtils()
devices_list: list = []

for device in broker.consume_from_topic(
topic="devices-topic",
group_id=group_id,
auto_offset_reset="earliest",
from_beginning=True,
):
devices_list.append(device)

devices = pd.DataFrame(devices_list)
# Will be removed in the future. Just here for initial tests.
devices.drop(
devices.columns[devices.columns.str.contains("^Unnamed")],
axis=1,
inplace=True,
)

return devices
54 changes: 52 additions & 2 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,63 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:

@staticmethod
def process_for_message_broker(
data: pd.DataFrame, tenant: Tenant, frequency: Frequency = Frequency.HOURLY
data: pd.DataFrame,
tenant: Tenant,
frequency: Frequency = Frequency.HOURLY,
topic: str = None,
caller: str = None,
) -> pd.DataFrame:
"""
Processes the input DataFrame for message broker consumption based on the specified tenant, frequency, and topic.
Args:
data (pd.DataFrame): The input data to be processed.
tenant (Tenant): The tenant filter for the data, defaults to Tenant.ALL.
frequency (Frequency): The data frequency (e.g., hourly), defaults to Frequency.HOURLY.
topic (str, optional): The Kafka topic being processed, defaults to None.
caller (str, optional): The group ID or identifier for devices processing, defaults to None.
Returns:
pd.DataFrame: The processed DataFrame ready for message broker consumption.
"""
print(caller)
from .airqo_utils import AirQoDataUtils

data.loc[:, "frequency"] = str(frequency)
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].apply(date_to_str)
data["timestamp"] = data["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

if tenant != Tenant.ALL:
data.loc[:, "tenant"] = str(tenant)

# Additional processing for hourly measurements topic
if topic == configuration.HOURLY_MEASUREMENTS_TOPIC:
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["tenant", "device_id", "site_id", "latitude", "longitude"]
]
devices.rename(
columns={
"device_id": "device_name",
"mongo_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "tenant"],
how="left",
)

data.rename(columns={"tenant": "network"}, inplace=True)
data["tenant"] = str(Tenant.AIRQO)

return data

@staticmethod
Expand Down
191 changes: 7 additions & 184 deletions src/workflows/airqo_etl_utils/message_broker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,199 +31,19 @@ def __init__(self):
"""
self.__partitions = configuration.TOPIC_PARTITIONS
self.__bootstrap_servers = configuration.BOOTSTRAP_SERVERS
# Initialize partition loads
# Initialize partition loads. Should only be used if you know the partions available.
# Note: This should be updated in case the number of partions used changes.
self.partition_loads = {int(p): 0 for p in self.__partitions}
self.config = {
"bootstrap.servers": self.__bootstrap_servers,
"request.timeout.ms": 300000,
"metadata.max.age.ms": 60000,
}

def __get_partition(self, current_partition) -> int:
"""Get next partition to load data into -- roundrobin"""
current_partition = current_partition + 1
if current_partition in self.__partitions:
return current_partition
return self.__partitions[0]

def __get_least_loaded_partition(self) -> int:
"""Select the least loaded partition."""
return min(self.partition_loads, key=self.partition_loads.get)

@classmethod
def __on_success(cls, record_metadata):
logger.info(
f"Successfully sent message: Topic-{record_metadata.topic}, Partition-{record_metadata.partition}, Offset-{record_metadata.offset}"
)

@classmethod
def __on_error(cls, record_metadata):
logger.exception(
f"Failed to send message: Topic-{record_metadata.topic}, Partition-{record_metadata.partition}, Offset-{record_metadata.offset}"
)

def __send_data(self, topic: str, data: pd.DataFrame, partition: int = None):
"""
Send data in chunks to a specified Kafka topic.
Args:
topic: Kafka topic to send the data to.
data: Pandas DataFrame containing the data to send.
partition: Optional partition to which the message should be sent.
"""
data.to_csv("message_broker_data.csv", index=False)
producer = Producer(
bootstrap_servers=self.__bootstrap_servers,
api_version_auto_timeout_ms=300000,
retries=5,
request_timeout_ms=300000,
)

print("Dataframe info : ")
print(data.info())
print("Dataframe description : ")
print(data.describe())

chunks = int(len(data) / 50)
chunks = chunks if chunks > 0 else 1
dataframes = np.array_split(data, chunks)
current_partition = -1
for dataframe in dataframes:
dataframe = pd.DataFrame(dataframe).replace(np.nan, None)
message = {"data": dataframe.to_dict("records")}

current_partition = (
partition
if partition or partition == 0
else self.__get_partition(current_partition=current_partition)
)
kafka_message = json.dumps(message, allow_nan=True).encode("utf-8")

producer.send(
topic=topic,
value=kafka_message,
partition=current_partition,
).add_callback(self.__on_success).add_errback(self.__on_error)

@staticmethod
def update_hourly_data_topic(data: pd.DataFrame):
"""
Update the hourly data topic with additional device metadata,
including device names, latitude, longitude, and tenant information.
Args:
data(pandas.DataFrame): The Pandas DataFrame to update with device metadata.
"""
devices = AirQoApi().get_devices(tenant=Tenant.ALL)
devices = pd.DataFrame(devices)
devices = devices[
[
"tenant",
"device_id",
"device_number",
"site_id",
"latitude",
"longitude",
]
]
devices.rename(
columns={
"device_id": "device_name",
"_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data.rename(
columns={
"device_id": "device_name",
},
inplace=True,
)

data.drop(columns=["device_number"], inplace=True, errors="ignore")

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "tenant"],
how="left",
)
data.rename(
columns={
"tenant": "network",
},
inplace=True,
)
data["tenant"] = str(Tenant.AIRQO)
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].apply(date_to_str)

MessageBrokerUtils().publish_to_topic(
configuration.HOURLY_MEASUREMENTS_TOPIC, data
)

@staticmethod
def update_hourly_data_topic_(data: pd.DataFrame) -> None:
"""
Update the hourly data topic with additional device metadata,
including device names, latitude, longitude, and tenant information.
Args:
data (pandas.DataFrame): The Pandas DataFrame to update with device metadata.
"""
# Fetch and prepare device data
devices = AirQoApi().get_devices(tenant=Tenant.ALL)
devices = pd.DataFrame(devices)
devices = devices[
[
"tenant",
"device_id",
"device_number",
"site_id",
"latitude",
"longitude",
]
]

devices.rename(
columns={
"device_id": "device_name",
"_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data.rename(
columns={
"device_id": "device_name",
},
inplace=True,
)

data.drop(columns=["device_number"], inplace=True, errors="ignore")

data = pd.concat(
[
data.set_index(["device_name", "site_id", "tenant"]),
devices.set_index(["device_name", "site_id", "tenant"]),
],
axis=1,
).reset_index()

data.rename(columns={"tenant": "network"}, inplace=True)
data["tenant"] = str(Tenant.AIRQO)
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].apply(date_to_str)

MessageBrokerUtils().publish_to_topic(
configuration.HOURLY_MEASUREMENTS_TOPIC, data
)

def __on_delivery(self, err, msg):
"""
Delivery callback for Kafka message send operations.
Expand Down Expand Up @@ -326,9 +146,11 @@ def publish_to_topic(
{
"retries": 5,
"batch.num.messages": 1000,
"retry.backoff.ms": 80000,
"batch.size": 1 * 1024 * 1024,
"retry.backoff.ms": 1000,
"debug": "msg",
"message.timeout.ms": 300000,
"message.max.bytes": 2 * 1024 * 1024,
}
)
producer = Producer(producer_config)
Expand All @@ -340,7 +162,7 @@ def publish_to_topic(
if column_key:
logger.info(f"Using '{column_key}' as the key for messages")
for row in dataframe_list:
key = row.pop(column_key, None)
key = row.get(column_key, None)
if key is None:
logger.warning(
f"No key found for column '{column_key}' in row: {row}"
Expand Down Expand Up @@ -409,6 +231,7 @@ def consume_from_topic(
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
"enable.auto.commit": "true" if auto_commit else "false",
"fetch.message.max.bytes": 2 * 1024 * 1024,
}
)

Expand Down
Loading

0 comments on commit e9c68a1

Please sign in to comment.