Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support for Delta table history #163

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ test = [
"pytest-xdist",
"requests_mock",
"time_machine",
"freezegun",
]
docs = [
"markdown>=3.5.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,45 @@ def exists(self) -> bool:
result = True
except AnalysisException as e:
err_msg = str(e).lower()
common_message = (
f"Table `{self.table}` doesn't exist. "
f"The `create_if_not_exists` flag is set to {self.create_if_not_exists}."
)

if err_msg.startswith("[table_or_view_not_found]") or err_msg.startswith("table or view not found"):
if self.create_if_not_exists:
dannymeijer marked this conversation as resolved.
Show resolved Hide resolved
self.log.info(" ".join((common_message, "Therefore the table will be created.")))
else:
self.log.error(" ".join((common_message, "Therefore the table will not be created.")))
self.log.debug(f"Table `{self.table_name}` does not exist.")
else:
raise e

return result

def describe_history(self, limit: Optional[int] = None) -> Optional[DataFrame]:
"""
Get the latest `limit` rows from the Delta Log.
The information is in reverse chronological order.

Parameters
----------
limit : Optional[int]
Number of rows to return.

Returns
-------
Optional[DataFrame]
Delta Table's history as a DataFrame or None if the table does not exist.

Examples
-------
```python
DeltaTableStep(...).describe_history()
```
Would return the full history from a Delta Log.

```python
DeltaTableStep(...).describe_history(limit=10)
```
Would return the last 10 operations from the Delta Log.
"""
if self.exists:
history_df = self.spark.sql(f"DESCRIBE HISTORY {self.table_name}")
history_df = history_df.orderBy("version", ascending=False)
if limit:
history_df = history_df.limit(limit)
return history_df
else:
self.log.warning(f"Table `{self.table_name}` does not exist.")
165 changes: 165 additions & 0 deletions src/koheesio/spark/delta/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
Utils for working with Delta tables.
"""
from datetime import datetime, timedelta

from koheesio.logger import LoggingFactory
from koheesio.spark.delta import DeltaTableStep
from koheesio.utils.date_time import DTInterval

log = LoggingFactory.get_logger(name=__name__, inherit_from_koheesio=True)


def is_data_stale(
table: DeltaTableStep,
months: int = 0,
weeks: int = 0,
days: int = 0,
hours: int = 0,
minutes: int = 0,
seconds: int = 0,
dt_interval: DTInterval = None,
refresh_day_num: int = None,
) -> bool:
"""
Determines if the data inside a table is stale based on the elapsed time since
the last modification and, optionally, based on the current week day.

The function allows specifying limits in terms of months, weeks, days, hours, minutes, and seconds to determine
how old data can be before it is considered stale. If `refresh_day_num` is provided, it adds an extra condition
to mark data as stale if the current day matches with the specified weekday.

The last modification date is taken from the Delta Log.

Parameters
----------
table : str
The path to the table to check.
months : int, default 0
Threshold in months to determine staleness.
weeks : int, default 0
Threshold in weeks.
days : int, default 0
Threshold in days.
hours : int, default 0
Threshold in hours.
minutes : int, default 0
Threshold in minutes.
seconds : int, default 0
Threshold in seconds.
dt_interval : DTInterval, optional
An alternative to directly specifying time components.
This should be an instance of DTInterval, which provides
the `to_timedelta` method that converts structured time
descriptions into a timedelta object.
refresh_day_num : int, optional
The weekday number (0=Monday, 6=Sunday) on which
data should be refreshed if it has not already. Enforces
a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds.

Examples
--------
Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates
in the history are shown alongside the examples.

Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold:
```
is_stale = is_data_stale(table_name, days=3)
print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days.
```

Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold:
```
is_stale = is_data_stale(table_name, days=3, hours=1)
print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold.
```

Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold:
```
is_stale = is_data_stale(table_name, days=3, hours=2)
print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours.
```

Example 4: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday):
```
is_stale = is_data_stale(table_name, days=5, refresh_day_num=5)
print(is_stale) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day.
```

Returns
-------
bool
True if data is considered stale by exceeding the defined time limits or if the current
day equals to `refresh_day_num`. Returns False if conditions are not met.
"""

if not any((months, weeks, days, hours, minutes, seconds)) and dt_interval is None:
raise ValueError(
zarembat marked this conversation as resolved.
Show resolved Hide resolved
"You must provide either time components (months, weeks, days, hours, minutes, seconds) or dt_interval."
)

if months > 0: # Convert months to days
month_days = int(months * 30.44) # Average month length
days += month_days
log.debug(f"Converted {months} months to {month_days} days.")

staleness_period = (
timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)
if any((weeks, days, hours, minutes, seconds))
else dt_interval.to_timedelta
)

if refresh_day_num is not None:
if not 0 <= refresh_day_num <= 6:
raise ValueError("refresh_day_num should be between 0 (Monday) and 6 (Sunday).")

max_period = timedelta(days=6, hours=23, minutes=59, seconds=59)
if staleness_period > max_period:
raise ValueError("With refresh_day_num set, the total period must be less than 7 days.")

current_time = datetime.now()

# Get the history of the Delta table
history_df = DeltaTableStep(table=table).describe_history()

if not history_df:
log.debug(f"No history found for `{table}`.")
return True # Consider data stale if the table does not exist

modification_operations = [
"WRITE",
"MERGE",
"DELETE",
"UPDATE",
"REPLACE TABLE AS SELECT",
"CREATE TABLE AS SELECT",
"TRUNCATE",
"RESTORE",
]

# Filter the history to data modification operations only
history_df = history_df.filter(history_df["operation"].isin(modification_operations))

# Get the last modification operation's timestamp
last_modification = history_df.select("timestamp").first()

if not last_modification:
log.debug(f"No modification operation found in the history for `{table}`.")
return True

last_modification_timestamp = last_modification["timestamp"]

cut_off_date = current_time - staleness_period

log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}")

is_stale_by_time = last_modification_timestamp <= cut_off_date

if refresh_day_num is not None:
current_day_of_week = current_time.weekday()
log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {refresh_day_num}")

is_appropriate_day_for_refresh = current_day_of_week == refresh_day_num
return is_stale_by_time or is_appropriate_day_for_refresh

return is_stale_by_time
File renamed without changes.
120 changes: 120 additions & 0 deletions src/koheesio/utils/date_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Utility functions related to date and time operations
"""
from typing import Optional
from datetime import timedelta
import re

from koheesio.models import BaseModel, model_validator

extract_dt_interval = re.compile(
r"""
(?P<years>\d+)\s+years?\s*|
(?P<months>\d+)\s+months?\s*|
(?P<weeks>\d+)\s+weeks?\s*|
(?P<days>\d+)\s+days?\s*|
(?P<hours>\d+)\s+hours?\s*|
(?P<minutes>\d+)\s+(?:minutes?|mins?)\s*|
(?P<seconds>\d+)\s+(?:seconds?|secs?)\s*|
(?P<milliseconds>\d+)\s+(?:milliseconds?|millis?)\s*|
(?P<microseconds>\d+)\s+(?:microseconds?|micros?)\s*
""",
re.VERBOSE,
)


class DTInterval(BaseModel):
"""
A class to define date and time intervals using human-readable strings or individual time components.

Parameters
----------
interval : str, optional
A human-readable string specifying the duration of the interval, broken down
into years, months, weeks, days, hours, minutes, seconds, milliseconds, and microseconds.
years : int, optional, default 0
Number of years in the interval.
months : int, optional, default 0
Number of months in the interval.
weeks : int, optional, default 0
Number of weeks in the interval.
days : int, optional, default 0
Number of days in the interval.
hours : int, optional, default 0
Number of hours in the interval.
minutes : int, optional, default 0
Number of minutes in the interval.
seconds : int, optional, default 0
Number of seconds in the interval.
milliseconds : int, optional, default 0
Number of milliseconds in the interval.
microseconds : int, optional, default 0
Number of microseconds in the interval.

Examples
--------
Creating an instance with time components:

```
print(DTInterval(years=2, weeks=3, hours=12).to_timedelta)
```
751 days, 12:00:00

Creating an instance from a string:
```
print(DTInterval(interval="1 year 2 months 3 weeks 4 days 5 hours 100 minutes 200 seconds 300 milliseconds 400 microseconds").to_timedelta)
```
451 days, 6:43:20.300400

Methods
-------
to_timedelta()
Converts the DTInterval instance to a timedelta object, aggregating all specified time components.
"""

interval: Optional[str] = None
years: Optional[int] = 0 # = days * 365.25 (average year length, rounded up)
months: Optional[int] = 0 # = days * 30.44 (average month length, rounded up)
weeks: Optional[int] = 0
days: Optional[int] = 0
hours: Optional[int] = 0
minutes: Optional[int] = 0
seconds: Optional[int] = 0
milliseconds: Optional[int] = 0
microseconds: Optional[int] = 0

@model_validator(mode="before")
def process_interval(cls, values: dict) -> dict:
"""Processes the input interval string and extracts the time components"""
if interval_value := values.get("interval"):
matches = extract_dt_interval.finditer(interval_value)

# update values with the extracted values
for match in matches:
for key, value in match.groupdict().items():
if value:
values[key] = int(value)
return values

@model_validator(mode="after")
def calculate_days(self) -> "DTInterval":
"""Years and months are not supported in timedelta, so we need to convert them to days"""
if self.years or self.months:
year_month_days = int(self.years * 365.25 + self.months * 30.44) # average year length
self.days += year_month_days
self.years = 0
self.months = 0
return self

@property
def to_timedelta(self) -> timedelta:
"""Returns the object as a timedelta object"""
return timedelta(
days=self.days,
seconds=self.seconds,
microseconds=self.microseconds,
milliseconds=self.milliseconds,
minutes=self.minutes,
hours=self.hours,
weeks=self.weeks,
)
Loading