diff --git a/pyproject.toml b/pyproject.toml index c56c7b78..89641a39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ test = [ "pytest-xdist", "requests_mock", "time_machine", + "freezegun", ] docs = [ "markdown>=3.5.2", diff --git a/src/koheesio/spark/delta.py b/src/koheesio/spark/delta/__init__.py similarity index 89% rename from src/koheesio/spark/delta.py rename to src/koheesio/spark/delta/__init__.py index 70a084df..cda3e829 100644 --- a/src/koheesio/spark/delta.py +++ b/src/koheesio/spark/delta/__init__.py @@ -214,7 +214,7 @@ def _alter_table() -> None: if override: self.log.debug( f"Property `{key}` presents in `{self.table_name}` and has value `{persisted_properties[key]}`." - f"Override is enabled.The value will be changed to `{v_str}`." + f"Override is enabled. The value will be changed to `{v_str}`." ) _alter_table() else: @@ -307,17 +307,45 @@ def exists(self) -> bool: result = True except AnalysisException as e: err_msg = str(e).lower() - common_message = ( - f"Table `{self.table}` doesn't exist. " - f"The `create_if_not_exists` flag is set to {self.create_if_not_exists}." - ) - if err_msg.startswith("[table_or_view_not_found]") or err_msg.startswith("table or view not found"): - if self.create_if_not_exists: - self.log.info(" ".join((common_message, "Therefore the table will be created."))) - else: - self.log.error(" ".join((common_message, "Therefore the table will not be created."))) + self.log.debug(f"Table `{self.table_name}` does not exist.") else: raise e return result + + def describe_history(self, limit: Optional[int] = None) -> Optional[DataFrame]: + """ + Get the latest `limit` rows from the Delta Log. + The information is in reverse chronological order. + + Parameters + ---------- + limit : Optional[int] + Number of rows to return. + + Returns + ------- + Optional[DataFrame] + Delta Table's history as a DataFrame or None if the table does not exist. + + Examples + ------- + ```python + DeltaTableStep(...).describe_history() + ``` + Would return the full history from a Delta Log. + + ```python + DeltaTableStep(...).describe_history(limit=10) + ``` + Would return the last 10 operations from the Delta Log. + """ + if self.exists: + history_df = self.spark.sql(f"DESCRIBE HISTORY {self.table_name}") + history_df = history_df.orderBy("version", ascending=False) + if limit: + history_df = history_df.limit(limit) + return history_df + else: + self.log.warning(f"Table `{self.table_name}` does not exist.") diff --git a/src/koheesio/spark/delta/utils.py b/src/koheesio/spark/delta/utils.py new file mode 100644 index 00000000..ed0a58bf --- /dev/null +++ b/src/koheesio/spark/delta/utils.py @@ -0,0 +1,172 @@ +""" +Utils for working with Delta tables. +""" +from datetime import datetime, timedelta + +from koheesio.logger import LoggingFactory +from koheesio.spark.delta import DeltaTableStep +from koheesio.utils.date_time import DTInterval + +log = LoggingFactory.get_logger(name=__name__, inherit_from_koheesio=True) + + +def is_data_stale( + table: DeltaTableStep, + months: int = 0, + weeks: int = 0, + days: int = 0, + hours: int = 0, + minutes: int = 0, + seconds: int = 0, + dt_interval: DTInterval = None, + refresh_day_num: int = None, +) -> bool: + """ + Determines if the data inside a table is stale based on the elapsed time since + the last modification and, optionally, based on the current week day. + + The function allows specifying limits in terms of months, weeks, days, hours, minutes, and seconds to determine + how old data can be before it is considered stale. If `refresh_day_num` is provided, it adds an extra condition + to mark data as stale if the current day matches with the specified weekday. + + The last modification date is taken from the Delta Log. + + Parameters + ---------- + table : str + The path to the table to check. + months : int, default 0 + Threshold in months to determine staleness. + weeks : int, default 0 + Threshold in weeks. + days : int, default 0 + Threshold in days. + hours : int, default 0 + Threshold in hours. + minutes : int, default 0 + Threshold in minutes. + seconds : int, default 0 + Threshold in seconds. + dt_interval : DTInterval, optional + An alternative to directly specifying time components. + This should be an instance of DTInterval, which provides + the `to_timedelta` method that converts structured time + descriptions into a timedelta object. + refresh_day_num : int, optional + The weekday number (0=Monday, 6=Sunday) on which + data should be refreshed if it has not already. Enforces + a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds. + + Examples + -------- + Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates + in the history are shown alongside the examples. + + Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold: + ``` + is_stale = is_data_stale(table_name, days=3) + print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days. + ``` + + Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold: + ``` + is_stale = is_data_stale(table_name, days=3, hours=1) + print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold. + ``` + + Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold: + ``` + is_stale = is_data_stale(table_name, days=3, hours=2) + print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. + ``` + + Example 4: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday): + ``` + is_stale = is_data_stale(table_name, days=5, refresh_day_num=5) + print(is_stale) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day. + ``` + + Returns + ------- + bool + True if data is considered stale by exceeding the defined time limits or if the current + day equals to `refresh_day_num`. Returns False if conditions are not met. + + Raises + ------ + ValueError + If neither time components nor `dt_interval` are provided. + If `refresh_day_num` is not between 0 and 6. + If the total period exceeds 7 days when `refresh_day_num` is set. + """ + + if not any((months, weeks, days, hours, minutes, seconds)) and dt_interval is None: + raise ValueError( + "You must provide either time components (months, weeks, days, hours, minutes, seconds) or dt_interval." + ) + + if months > 0: # Convert months to days + month_days = int(months * 30.44) # Average month length + days += month_days + log.debug(f"Converted {months} months to {month_days} days.") + + staleness_period = ( + timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) + if any((weeks, days, hours, minutes, seconds)) + else dt_interval.to_timedelta + ) + + if refresh_day_num is not None: + if not 0 <= refresh_day_num <= 6: + raise ValueError("refresh_day_num should be between 0 (Monday) and 6 (Sunday).") + + max_period = timedelta(days=6, hours=23, minutes=59, seconds=59) + if staleness_period > max_period: + raise ValueError("With refresh_day_num set, the total period must be less than 7 days.") + + current_time = datetime.now() + + # Get the history of the Delta table + history_df = DeltaTableStep(table=table).describe_history() + + if not history_df: + log.debug(f"No history found for `{table}`.") + return True # Consider data stale if the table does not exist + + modification_operations = [ + "WRITE", + "MERGE", + "DELETE", + "UPDATE", + "REPLACE TABLE AS SELECT", + "CREATE TABLE AS SELECT", + "TRUNCATE", + "RESTORE", + ] + + # Filter the history to data modification operations only + history_df = history_df.filter(history_df["operation"].isin(modification_operations)) + + # Get the last modification operation's timestamp + last_modification = history_df.select("timestamp").first() + + if not last_modification: + log.debug(f"No modification operation found in the history for `{table}`.") + return True + + last_modification_timestamp = last_modification["timestamp"] + + cut_off_date = current_time - staleness_period + + log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}") + + is_stale_by_time = last_modification_timestamp <= cut_off_date + + if refresh_day_num is not None: + current_day_of_week = current_time.weekday() + log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {refresh_day_num}") + + is_appropriate_day_for_refresh = current_day_of_week == refresh_day_num + return is_stale_by_time or is_appropriate_day_for_refresh + + return is_stale_by_time diff --git a/src/koheesio/spark/writers/delta/batch.py b/src/koheesio/spark/writers/delta/batch.py index 823ecbd1..f3d29cd4 100644 --- a/src/koheesio/spark/writers/delta/batch.py +++ b/src/koheesio/spark/writers/delta/batch.py @@ -385,7 +385,11 @@ def execute(self) -> Writer.Output: if self.table.create_if_not_exists and not self.table.exists: _writer = _writer.options(**self.table.default_create_properties) - + message = ( + f"Table `{self.table}` doesn't exist. The `create_if_not_exists` flag is set to True. " + "Therefore the table will be created." + ) + self.log.info(message) if isinstance(_writer, DeltaMergeBuilder) or type(_writer).__name__ == "DeltaMergeBuilder": _writer.execute() else: diff --git a/src/koheesio/utils.py b/src/koheesio/utils/__init__.py similarity index 100% rename from src/koheesio/utils.py rename to src/koheesio/utils/__init__.py diff --git a/src/koheesio/utils/date_time.py b/src/koheesio/utils/date_time.py new file mode 100644 index 00000000..6b6281ac --- /dev/null +++ b/src/koheesio/utils/date_time.py @@ -0,0 +1,120 @@ +""" +Utility functions related to date and time operations +""" +from typing import Optional +from datetime import timedelta +import re + +from koheesio.models import BaseModel, model_validator + +extract_dt_interval = re.compile( + r""" + (?P\d+)\s+years?\s*| + (?P\d+)\s+months?\s*| + (?P\d+)\s+weeks?\s*| + (?P\d+)\s+days?\s*| + (?P\d+)\s+hours?\s*| + (?P\d+)\s+(?:minutes?|mins?)\s*| + (?P\d+)\s+(?:seconds?|secs?)\s*| + (?P\d+)\s+(?:milliseconds?|millis?)\s*| + (?P\d+)\s+(?:microseconds?|micros?)\s* +""", + re.VERBOSE, +) + + +class DTInterval(BaseModel): + """ + A class to define date and time intervals using human-readable strings or individual time components. + + Parameters + ---------- + interval : str, optional + A human-readable string specifying the duration of the interval, broken down + into years, months, weeks, days, hours, minutes, seconds, milliseconds, and microseconds. + years : int, optional, default 0 + Number of years in the interval. + months : int, optional, default 0 + Number of months in the interval. + weeks : int, optional, default 0 + Number of weeks in the interval. + days : int, optional, default 0 + Number of days in the interval. + hours : int, optional, default 0 + Number of hours in the interval. + minutes : int, optional, default 0 + Number of minutes in the interval. + seconds : int, optional, default 0 + Number of seconds in the interval. + milliseconds : int, optional, default 0 + Number of milliseconds in the interval. + microseconds : int, optional, default 0 + Number of microseconds in the interval. + + Examples + -------- + Creating an instance with time components: + + ``` + print(DTInterval(years=2, weeks=3, hours=12).to_timedelta) + ``` + 751 days, 12:00:00 + + Creating an instance from a string: + ``` + print(DTInterval(interval="1 year 2 months 3 weeks 4 days 5 hours 100 minutes 200 seconds 300 milliseconds 400 microseconds").to_timedelta) + ``` + 451 days, 6:43:20.300400 + + Methods + ------- + to_timedelta() + Converts the DTInterval instance to a timedelta object, aggregating all specified time components. + """ + + interval: Optional[str] = None + years: Optional[int] = 0 # = days * 365.25 (average year length, rounded up) + months: Optional[int] = 0 # = days * 30.44 (average month length, rounded up) + weeks: Optional[int] = 0 + days: Optional[int] = 0 + hours: Optional[int] = 0 + minutes: Optional[int] = 0 + seconds: Optional[int] = 0 + milliseconds: Optional[int] = 0 + microseconds: Optional[int] = 0 + + @model_validator(mode="before") + def process_interval(cls, values: dict) -> dict: + """Processes the input interval string and extracts the time components""" + if interval_value := values.get("interval"): + matches = extract_dt_interval.finditer(interval_value) + + # update values with the extracted values + for match in matches: + for key, value in match.groupdict().items(): + if value: + values[key] = int(value) + return values + + @model_validator(mode="after") + def calculate_days(self) -> "DTInterval": + """Years and months are not supported in timedelta, so we need to convert them to days""" + if self.years or self.months: + year_month_days = int(self.years * 365.25 + self.months * 30.44) # average year length + self.days += year_month_days + self.years = 0 + self.months = 0 + return self + + @property + def to_timedelta(self) -> timedelta: + """Returns the object as a timedelta object""" + return timedelta( + days=self.days, + seconds=self.seconds, + microseconds=self.microseconds, + milliseconds=self.milliseconds, + minutes=self.minutes, + hours=self.hours, + weeks=self.weeks, + ) diff --git a/tests/spark/test_delta.py b/tests/spark/delta/test_delta.py similarity index 70% rename from tests/spark/test_delta.py rename to tests/spark/delta/test_delta.py index 920d2b6b..cf352a14 100644 --- a/tests/spark/test_delta.py +++ b/tests/spark/delta/test_delta.py @@ -5,10 +5,12 @@ from conftest import setup_test_data import pytest +from chispa import assert_df_equality from pydantic import ValidationError from pyspark.sql.types import LongType +from pyspark.sql.utils import AnalysisException from koheesio.logger import LoggingFactory from koheesio.spark.delta import DeltaTableStep @@ -154,4 +156,49 @@ def test_exists(caplog, table, create_if_not_exists, log_level): dt = DeltaTableStep(table=table, create_if_not_exists=create_if_not_exists) dt.log.setLevel(log_level) assert dt.exists is False - assert f"The `create_if_not_exists` flag is set to {create_if_not_exists}." in caplog.text + + +@pytest.fixture +def test_history_df(spark): + data = [ + {"version": 0, "timestamp": "2024-12-30 12:00:00", "tableName": "test_table", "operation": "CREATE TABLE"}, + {"version": 1, "timestamp": "2024-12-31 05:29:30", "tableName": "test_table", "operation": "WRITE"}, + {"version": 2, "timestamp": "2025-01-01 11:12:19", "tableName": "test_table", "operation": "MERGE"}, + ] + return spark.createDataFrame(data) + + +def test_describe_history__no_limit(mocker, spark, test_history_df): + mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=True)) + dt = DeltaTableStep(table="test_table") + mocker.patch.object(spark, "sql", return_value=test_history_df) + result = dt.describe_history() + expected_df = spark.createDataFrame( + [ + {"version": 2, "timestamp": "2025-01-01 11:12:19", "tableName": "test_table", "operation": "MERGE"}, + {"version": 1, "timestamp": "2024-12-31 05:29:30", "tableName": "test_table", "operation": "WRITE"}, + {"version": 0, "timestamp": "2024-12-30 12:00:00", "tableName": "test_table", "operation": "CREATE TABLE"}, + ] + ) + assert_df_equality(result, expected_df, ignore_column_order=True) + + +def test_describe_history__with_limit(mocker, spark, test_history_df): + mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=True)) + dt = DeltaTableStep(table="test_table") + mocker.patch.object(spark, "sql", return_value=test_history_df) + result = dt.describe_history(limit=1) + expected_df = spark.createDataFrame( + [ + {"version": 2, "timestamp": "2025-01-01 11:12:19", "tableName": "test_table", "operation": "MERGE"}, + ] + ) + assert_df_equality(result, expected_df, ignore_column_order=True) + + +def test_describe_history__no_table(mocker): + mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=False)) + dt = DeltaTableStep(table="test_table") + result = dt.describe_history() + + assert result is None diff --git a/tests/spark/delta/test_delta_utils.py b/tests/spark/delta/test_delta_utils.py new file mode 100644 index 00000000..b8d4eee1 --- /dev/null +++ b/tests/spark/delta/test_delta_utils.py @@ -0,0 +1,172 @@ +from datetime import datetime + +from freezegun import freeze_time +import pytest + +from koheesio.spark.delta.utils import is_data_stale +from koheesio.utils.date_time import DTInterval + + +@pytest.fixture +def test_history_df(spark): + return spark.createDataFrame( + [ + {"timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=29, second=30), "operation": "WRITE", "version": 1}, + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), + "operation": "CREATE TABLE", + "version": 0 + }, + ] + ) + + +@pytest.mark.parametrize( + "months, weeks, days, hours, minutes, seconds, expected", + [ + (1, 0, 0, 0, 0, 0, False), # Not stale, since 1 month > 1 day and some hours ago + (0, 1, 0, 0, 0, 0, False), # Not stale, since 1 week > 1 day and some hours ago + (0, 0, 2, 0, 0, 0, False), # Not stale, since 2 days > 1 day and some hours ago + (0, 0, 1, 7, 0, 0, False), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes + (0, 0, 1, 6, 31, 0, False), # Not stale, since 1 minute more than the timestamp age + (0, 0, 1, 6, 30, 31, False), # Not stale, since 1 second more than the timestamp age + (0, 0, 1, 6, 30, 30, True), # Exactly equal to the age, should be considered stale + (0, 0, 1, 6, 30, 29, True), # Stale, falls 1 second short + (0, 0, 1, 0, 0, 0, True), # Stale, falls several hours and minutes short + ( + 0, + 0, + 0, + 18, + 0, + 0, + True, + ), # Stale, despite being more than half a day, but less than the full duration since the timestamp + ], +) +@freeze_time("2024-12-31 12:00:00") +def test_is_data_stale__no_refresh_day_num_with_time_components( + months, weeks, days, hours, minutes, seconds, expected, test_history_df, mocker +): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) + + assert ( + is_data_stale( + "dummy_table", months=months, weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds + ) + == expected + ) + + +@pytest.mark.parametrize( + "dt_interval, expected", + [ + (DTInterval(interval="2 days"), False), # Not stale, since 2 days > 1 day and some hours ago + ( + DTInterval(interval="1 day, 7 hours"), + False, + ), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes + ( + DTInterval(interval="1 day, 6 hours, 31 minutes"), + False, + ), # Not stale, since 1 minute more than the timestamp age + ( + DTInterval(interval="1 day, 6 hours, 30 minutes, 31 seconds"), + False, + ), # Not stale, since 1 second more than the timestamp age + ( + DTInterval(interval="1 day, 6 hours, 30 minutes, 30 seconds"), + True, + ), # Exactly equal to the age, should be considered stale + (DTInterval(interval="1 day, 6 hours, 30 minutes, 29 seconds"), True), # Stale, falls 1 second short + (DTInterval(interval="1 day"), True), # Stale, falls several hours and minutes short + ( + DTInterval(interval="18 hours"), + True, + ), # Stale, despite being more than half a day, but less than the full duration since the timestamp + ], +) +@freeze_time("2024-12-31 12:00:00") +def test_is_data_stale__no_refresh_day_num_with_dt_interval(dt_interval, expected, test_history_df, mocker): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) + + assert is_data_stale("dummy_table", dt_interval=dt_interval) == expected + + +def test_is_data_stale__no_table(mocker): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=None) + + assert is_data_stale("dummy_table", days=1) + + +def test_is_data_stale__no_modification_history(mocker, spark): + + history_df = spark.createDataFrame( + [ + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), + "operation": "CREATE TABLE", + } + ] + ) + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=history_df) + + assert is_data_stale("dummy_table", days=1) + + +@pytest.mark.parametrize( + "interval, refresh_day_num, expected", + [ + ( + DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + 2, + False, + ), # Data is not stale and it's before the refresh day + ( + DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + 1, + True, + ), # Data is not stale but it is the refresh day + ( + DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + 0, + False, + ), # Data is not stale and it is past the refresh day + ( + DTInterval(interval="6 hours, 30 minutes, 30 seconds"), + 2, + True, + ), # Data is stale and it's before the refresh day + (DTInterval(interval="6 hours, 30 minutes, 30 seconds"), 1, True), # Data is stale and it is the refresh day + ( + DTInterval(interval="6 hours, 30 minutes, 30 seconds"), + 0, + True, + ), # Data is stale and it is past the refresh day + ], +) +@freeze_time("2024-12-31 12:00:00") # Tuesday +def test_is_data_stale__with_refresh_day(interval, refresh_day_num, expected, test_history_df, mocker): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) + + assert is_data_stale("dummy_table", dt_interval=interval, refresh_day_num=refresh_day_num) == expected + + +def test_is_data_stale__missing_input(): + with pytest.raises(ValueError): + is_data_stale("dummy_table") + + +def test_is_data_stale__invalid_refresh_day(): + with pytest.raises(ValueError): + is_data_stale("dummy_table", dt_interval=DTInterval(interval="1 day"), refresh_day_num=7) + + +def test_is_data_stale__invalid_staleness_period_with_refresh_day(): + with pytest.raises(ValueError): + is_data_stale("dummy_table", dt_interval=DTInterval(interval="1 month"), refresh_day_num=5) diff --git a/tests/utils/test_utils.py b/tests/utils/test_common_utils.py similarity index 100% rename from tests/utils/test_utils.py rename to tests/utils/test_common_utils.py diff --git a/tests/utils/test_date_time_utils.py b/tests/utils/test_date_time_utils.py new file mode 100644 index 00000000..ee8141b9 --- /dev/null +++ b/tests/utils/test_date_time_utils.py @@ -0,0 +1,55 @@ +from datetime import timedelta + +import pytest + +from koheesio.utils.date_time import DTInterval + + +@pytest.mark.parametrize( + "input,expected", + [ + ( + # check with all values + "5 years 3 months 2 weeks 4 days 10 hours 42 minutes 4 seconds 20 milliseconds 200 microseconds", + timedelta( + days=int(4 + 14 + 3 * 30.44 + 5 * 365.25), + seconds=4, + microseconds=200, + milliseconds=20, + minutes=42, + hours=10, + ), + ), + ( + # check with aliases + "7 years 4 months 1 weeks 3 days 8 hours 15 mins 30 secs 50 millis 100 micros", + timedelta( + days=int(3 + 7 + 4 * 30.44 + 7 * 365.25), + seconds=30, + microseconds=100, + milliseconds=50, + minutes=15, + hours=8, + ), + ), + ( + # check with singular values + "1 year 1 month 1 week 1 day 1 hour 1 minute 1 second 1 millisecond 1 microsecond", + timedelta(days=395 + 8, seconds=1, microseconds=1, milliseconds=1, minutes=1, hours=1), + ), + ( + # check with singular alias values + "1 min 1 sec 1 milli 1 micro", + timedelta(seconds=1, microseconds=1, milliseconds=1, minutes=1), + ), + ( + # test with week value + "1 week", + timedelta(weeks=1), + ), + ("5 years", timedelta(days=int(365.25 * 5))), + ], +) +def test_dt_interval(input, expected): + dt_interval = DTInterval(interval=input) + assert dt_interval.to_timedelta == expected