From 056359a7160886441b6afa112d46e54e0e6fb518 Mon Sep 17 00:00:00 2001 From: zarembat Date: Fri, 31 Jan 2025 14:03:59 +0100 Subject: [PATCH 1/6] [FEATURE] - Added describe_history() method to DeltaTableStep, enabling fetching of Delta table history as a Spark DataFrame. - Added is_date_stale() method to assess if data in a specified table is stale based on defined time intervals or a specific refresh day. - Added DTInterval class for efficient management of date and time intervals. --- pyproject.toml | 1 + .../spark/{delta.py => delta/__init__.py} | 40 ++++ src/koheesio/spark/delta/utils.py | 127 +++++++++++++ src/koheesio/{utils.py => utils/__init__.py} | 0 src/koheesio/utils/date_time.py | 75 ++++++++ tests/spark/{ => delta}/test_delta.py | 54 ++++++ tests/spark/delta/test_delta_utils.py | 172 ++++++++++++++++++ .../{test_utils.py => test_common_utils.py} | 0 tests/utils/test_date_time_utils.py | 55 ++++++ 9 files changed, 524 insertions(+) rename src/koheesio/spark/{delta.py => delta/__init__.py} (90%) create mode 100644 src/koheesio/spark/delta/utils.py rename src/koheesio/{utils.py => utils/__init__.py} (100%) create mode 100644 src/koheesio/utils/date_time.py rename tests/spark/{ => delta}/test_delta.py (70%) create mode 100644 tests/spark/delta/test_delta_utils.py rename tests/utils/{test_utils.py => test_common_utils.py} (100%) create mode 100644 tests/utils/test_date_time_utils.py diff --git a/pyproject.toml b/pyproject.toml index c56c7b78..89641a39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ test = [ "pytest-xdist", "requests_mock", "time_machine", + "freezegun", ] docs = [ "markdown>=3.5.2", diff --git a/src/koheesio/spark/delta.py b/src/koheesio/spark/delta/__init__.py similarity index 90% rename from src/koheesio/spark/delta.py rename to src/koheesio/spark/delta/__init__.py index 70a084df..e11fb349 100644 --- a/src/koheesio/spark/delta.py +++ b/src/koheesio/spark/delta/__init__.py @@ -321,3 +321,43 @@ def exists(self) -> bool: raise e return result + + def describe_history(self, limit: Optional[int] = None) -> Optional[DataFrame]: + """ + Get the latest `limit` rows from the Delta Log. + The information is in reverse chronological order. + + Parameters + ---------- + limit : Optional[int] + Number of rows to return. + + Returns + ------- + Optional[DataFrame] + Delta Table's history as a DataFrame or None if the table does not exist. + + Examples + ------- + ```python + DeltaTableStep(...).describe_history() + ``` + Would return the full history from a Delta Log. + + ```python + DeltaTableStep(...).describe_history(limit=10) + ``` + Would return the last 10 operations from the Delta Log. + """ + try: + history_df = self.spark.sql(f"DESCRIBE HISTORY {self.table_name}") + history_df = history_df.orderBy("version", ascending=False) + if limit: + history_df = history_df.limit(limit) + return history_df + except AnalysisException as e: + err_msg = str(e).lower() + if err_msg.startswith("[table_or_view_not_found]"): + self.log.warning(f"The table or view {self.table_name} does not exist.") + else: + raise e diff --git a/src/koheesio/spark/delta/utils.py b/src/koheesio/spark/delta/utils.py new file mode 100644 index 00000000..f621a823 --- /dev/null +++ b/src/koheesio/spark/delta/utils.py @@ -0,0 +1,127 @@ +""" +Utils for working with Delta tables. +""" +from datetime import datetime, timedelta + +from koheesio.spark.delta import DeltaTableStep +from koheesio.utils.date_time import DTInterval + + +def is_data_stale( + table: DeltaTableStep, + months: int = 0, + weeks: int = 0, + days: int = 0, + hours: int = 0, + minutes: int = 0, + seconds: int = 0, + dt_interval: DTInterval = None, + refresh_day_num: int = None, +) -> bool: + """ + Determines if the data inside a table is stale based on the elapsed time since + the last modification and, optionally, based on the current week day. + + The function allows specifying limits in terms of months, weeks, days, hours, minutes, and seconds to determine + how old data can be before it is considered stale. If `refresh_day_num` is provided, it adds an extra condition + to mark data as stale if the current day matches with the specified weekday. + + The last modification date is taken from the Delta Log. + + Parameters + ---------- + table : str + The path to the table to check. + months : int, default 0 + Threshold in months to determine staleness. + weeks : int, default 0 + Threshold in weeks. + days : int, default 0 + Threshold in days. + hours : int, default 0 + Threshold in hours. + minutes : int, default 0 + Threshold in minutes. + seconds : int, default 0 + Threshold in seconds. + dt_interval : DTInterval, optional + An alternative to directly specifying time components. + This should be an instance of DTInterval, which provides + the `to_timedelta` method that converts structured time + descriptions into a timedelta object. + refresh_day_num : int, optional + The weekday number (0=Monday, 6=Sunday) on which + data should be refreshed if it has not already. Enforces + a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds. + + Returns + ------- + bool + True if data is considered stale by exceeding the defined time limits or if the current + day equals to `refresh_day_num`. Returns False if conditions are not met. + """ + + if not any((months, weeks, days, hours, minutes, seconds)) and dt_interval is None: + raise ValueError( + "You must provide either time components (months, weeks, days, hours, minutes, seconds) or dt_interval." + ) + + if months > 0: # Convert months to days + month_days = int(months * 30.44) # Average month length + days += month_days + + staleness_period = ( + timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) + if any((weeks, days, hours, minutes, seconds)) + else dt_interval.to_timedelta + ) + + if refresh_day_num is not None: + if not 0 <= refresh_day_num <= 6: + raise ValueError("refresh_day_num should be between 0 (Monday) and 6 (Sunday).") + + max_period = timedelta(days=6, hours=23, minutes=59, seconds=59) + if staleness_period > max_period: + raise ValueError("With refresh_day_num set, the total period must be less than 7 days.") + + current_time = datetime.now() + + # Get the history of the Delta table + history_df = DeltaTableStep(table=table).describe_history() + + if not history_df: + return True # Consider data stale if no history exists + + modification_operations = [ + "WRITE", + "MERGE", + "DELETE", + "UPDATE", + "REPLACE TABLE AS SELECT", + "CREATE TABLE AS SELECT", + "TRUNCATE", + "RESTORE", + ] + + # Filter the history to data modification operations only + history_df = history_df.filter(history_df["operation"].isin(modification_operations)) + + # Get the last modification operation's timestamp + last_modification = history_df.select("timestamp").first() + + if not last_modification: + return True # No modification operation found in the history + + last_modification_timestamp = last_modification["timestamp"] + + cut_off_date = current_time - staleness_period + + is_stale_by_time = last_modification_timestamp <= cut_off_date + + if refresh_day_num is not None: + current_day_of_week = current_time.weekday() + + is_appropriate_day_for_refresh = current_day_of_week == refresh_day_num + return is_stale_by_time or is_appropriate_day_for_refresh + + return is_stale_by_time diff --git a/src/koheesio/utils.py b/src/koheesio/utils/__init__.py similarity index 100% rename from src/koheesio/utils.py rename to src/koheesio/utils/__init__.py diff --git a/src/koheesio/utils/date_time.py b/src/koheesio/utils/date_time.py new file mode 100644 index 00000000..fc608128 --- /dev/null +++ b/src/koheesio/utils/date_time.py @@ -0,0 +1,75 @@ +""" +Utility functions related to date and time operations +""" +from typing import Optional +from datetime import timedelta +import re + +from koheesio.models import BaseModel, model_validator + +extract_dt_interval = re.compile( + r""" + (?P\d+)\s+years?\s*| + (?P\d+)\s+months?\s*| + (?P\d+)\s+weeks?\s*| + (?P\d+)\s+days?\s*| + (?P\d+)\s+hours?\s*| + (?P\d+)\s+(?:minutes?|mins?)\s*| + (?P\d+)\s+(?:seconds?|secs?)\s*| + (?P\d+)\s+(?:milliseconds?|millis?)\s*| + (?P\d+)\s+(?:microseconds?|micros?)\s* +""", + re.VERBOSE, +) + + +class DTInterval(BaseModel): + """ + A class to define date and time intervals using human-readable strings or individual time components. + """ + interval: Optional[str] = None + years: int = 0 # = days * 365.25 (average year length, rounded up) + months: int = 0 # = days * 30.44 (average month length, rounded up) + weeks: int = 0 + days: int = 0 + hours: int = 0 + minutes: int = 0 + seconds: int = 0 + milliseconds: int = 0 + microseconds: int = 0 + + @model_validator(mode="before") + def process_interval(cls, values: dict) -> dict: + """Processes the input interval string and extracts the time components""" + if interval_value := values.get("interval"): + matches = extract_dt_interval.finditer(interval_value) + + # update values with the extracted values + for match in matches: + for key, value in match.groupdict().items(): + if value: + values[key] = int(value) + return values + + @model_validator(mode="after") + def calculate_days(self) -> "DTInterval": + """Years and months are not supported in timedelta, so we need to convert them to days""" + if self.years or self.months: + year_month_days = int(self.years * 365.25 + self.months * 30.44) # average year length + self.days += year_month_days + self.years = 0 + self.months = 0 + return self + + @property + def to_timedelta(self) -> timedelta: + """Returns the object as a timedelta object""" + return timedelta( + days=self.days, + seconds=self.seconds, + microseconds=self.microseconds, + milliseconds=self.milliseconds, + minutes=self.minutes, + hours=self.hours, + weeks=self.weeks, + ) diff --git a/tests/spark/test_delta.py b/tests/spark/delta/test_delta.py similarity index 70% rename from tests/spark/test_delta.py rename to tests/spark/delta/test_delta.py index 920d2b6b..4d09ad53 100644 --- a/tests/spark/test_delta.py +++ b/tests/spark/delta/test_delta.py @@ -5,10 +5,12 @@ from conftest import setup_test_data import pytest +from chispa import assert_df_equality from pydantic import ValidationError from pyspark.sql.types import LongType +from pyspark.sql.utils import AnalysisException from koheesio.logger import LoggingFactory from koheesio.spark.delta import DeltaTableStep @@ -155,3 +157,55 @@ def test_exists(caplog, table, create_if_not_exists, log_level): dt.log.setLevel(log_level) assert dt.exists is False assert f"The `create_if_not_exists` flag is set to {create_if_not_exists}." in caplog.text + + +@pytest.fixture +def test_history_df(spark): + data = [ + {"version": 0, "timestamp": "2024-12-30 12:00:00", "tableName": "test_table", "operation": "CREATE TABLE"}, + {"version": 1, "timestamp": "2024-12-31 05:29:30", "tableName": "test_table", "operation": "WRITE"}, + {"version": 2, "timestamp": "2025-01-01 11:12:19", "tableName": "test_table", "operation": "MERGE"}, + ] + return spark.createDataFrame(data) + + +def test_describe_history__no_limit(mocker, spark, test_history_df): + dt = DeltaTableStep(table="test_table") + mocker.patch.object(spark, "sql", return_value=test_history_df) + result = dt.describe_history() + expected_df = spark.createDataFrame( + [ + {"version": 2, "timestamp": "2025-01-01 11:12:19", "tableName": "test_table", "operation": "MERGE"}, + {"version": 1, "timestamp": "2024-12-31 05:29:30", "tableName": "test_table", "operation": "WRITE"}, + {"version": 0, "timestamp": "2024-12-30 12:00:00", "tableName": "test_table", "operation": "CREATE TABLE"}, + ] + ) + assert_df_equality(result, expected_df, ignore_column_order=True) + + +def test_describe_history__with_limit(mocker, spark, test_history_df): + dt = DeltaTableStep(table="test_table") + mocker.patch.object(spark, "sql", return_value=test_history_df) + result = dt.describe_history(limit=1) + expected_df = spark.createDataFrame( + [ + {"version": 2, "timestamp": "2025-01-01 11:12:19", "tableName": "test_table", "operation": "MERGE"}, + ] + ) + assert_df_equality(result, expected_df, ignore_column_order=True) + + +def test_describe_history__no_table(mocker, spark): + dt = DeltaTableStep(table="test_table") + mocker.patch.object(spark, "sql", side_effect=AnalysisException("[TABLE_OR_VIEW_NOT_FOUND]")) + result = dt.describe_history() + + assert result is None + + +def test_describe_history__error(mocker, spark): + dt = DeltaTableStep(table="test_table") + mocker.patch.object(spark, "sql", side_effect=AnalysisException("Some other error")) + + with pytest.raises(AnalysisException): + dt.describe_history() diff --git a/tests/spark/delta/test_delta_utils.py b/tests/spark/delta/test_delta_utils.py new file mode 100644 index 00000000..7c82c866 --- /dev/null +++ b/tests/spark/delta/test_delta_utils.py @@ -0,0 +1,172 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time + +from koheesio.utils.date_time import DTInterval +from koheesio.spark.delta.utils import is_data_stale + + +@pytest.fixture +def test_history_df(spark): + return spark.createDataFrame( + [ + {"timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=29, second=30), "operation": "WRITE", "version": 1}, + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), + "operation": "CREATE TABLE", + "version": 0 + }, + ] + ) + + +@pytest.mark.parametrize( + "months, weeks, days, hours, minutes, seconds, expected", + [ + (1, 0, 0, 0, 0, 0, False), # Not stale, since 1 month > 1 day and some hours ago + (0, 1, 0, 0, 0, 0, False), # Not stale, since 1 week > 1 day and some hours ago + (0, 0, 2, 0, 0, 0, False), # Not stale, since 2 days > 1 day and some hours ago + (0, 0, 1, 7, 0, 0, False), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes + (0, 0, 1, 6, 31, 0, False), # Not stale, since 1 minute more than the timestamp age + (0, 0, 1, 6, 30, 31, False), # Not stale, since 1 second more than the timestamp age + (0, 0, 1, 6, 30, 30, True), # Exactly equal to the age, should be considered stale + (0, 0, 1, 6, 30, 29, True), # Stale, falls 1 second short + (0, 0, 1, 0, 0, 0, True), # Stale, falls several hours and minutes short + ( + 0, + 0, + 0, + 18, + 0, + 0, + True, + ), # Stale, despite being more than half a day, but less than the full duration since the timestamp + ], +) +@freeze_time("2024-12-31 12:00:00") +def test_is_data_stale__no_refresh_day_num_with_time_components( + months, weeks, days, hours, minutes, seconds, expected, test_history_df, mocker +): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) + + assert ( + is_data_stale( + "dummy_table", months=months, weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds + ) + == expected + ) + + +@pytest.mark.parametrize( + "dt_interval, expected", + [ + (DTInterval(interval="2 days"), False), # Not stale, since 2 days > 1 day and some hours ago + ( + DTInterval(interval="1 day, 7 hours"), + False, + ), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes + ( + DTInterval(interval="1 day, 6 hours, 31 minutes"), + False, + ), # Not stale, since 1 minute more than the timestamp age + ( + DTInterval(interval="1 day, 6 hours, 30 minutes, 31 seconds"), + False, + ), # Not stale, since 1 second more than the timestamp age + ( + DTInterval(interval="1 day, 6 hours, 30 minutes, 30 seconds"), + True, + ), # Exactly equal to the age, should be considered stale + (DTInterval(interval="1 day, 6 hours, 30 minutes, 29 seconds"), True), # Stale, falls 1 second short + (DTInterval(interval="1 day"), True), # Stale, falls several hours and minutes short + ( + DTInterval(interval="18 hours"), + True, + ), # Stale, despite being more than half a day, but less than the full duration since the timestamp + ], +) +@freeze_time("2024-12-31 12:00:00") +def test_is_data_stale__no_refresh_day_num_with_dt_interval(dt_interval, expected, test_history_df, mocker): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) + + assert is_data_stale("dummy_table", dt_interval=dt_interval) == expected + + +def test_is_data_stale__no_table(mocker): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=None) + + assert is_data_stale("dummy_table", days=1) + + +def test_is_data_stale__no_modification_history(mocker, spark): + + history_df = spark.createDataFrame( + [ + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), + "operation": "CREATE TABLE", + } + ] + ) + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=history_df) + + assert is_data_stale("dummy_table", days=1) + + +@pytest.mark.parametrize( + "interval, refresh_day_num, expected", + [ + ( + DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + 2, + False, + ), # Data is not stale and it's before the refresh day + ( + DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + 1, + True, + ), # Data is not stale but it is the refresh day + ( + DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + 0, + False, + ), # Data is not stale and it is past the refresh day + ( + DTInterval(interval="6 hours, 30 minutes, 30 seconds"), + 2, + True, + ), # Data is stale and it's before the refresh day + (DTInterval(interval="6 hours, 30 minutes, 30 seconds"), 1, True), # Data is stale and it is the refresh day + ( + DTInterval(interval="6 hours, 30 minutes, 30 seconds"), + 0, + True, + ), # Data is stale and it is past the refresh day + ], +) +@freeze_time("2024-12-31 12:00:00") # Tuesday +def test_is_data_stale__with_refresh_day(interval, refresh_day_num, expected, test_history_df, mocker): + + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) + + assert is_data_stale("dummy_table", dt_interval=interval, refresh_day_num=refresh_day_num) == expected + + +def test_is_data_stale__missing_input(): + with pytest.raises(ValueError): + is_data_stale("dummy_table") + + +def test_is_data_stale__invalid_refresh_day(): + with pytest.raises(ValueError): + is_data_stale("dummy_table", dt_interval=DTInterval(interval="1 day"), refresh_day_num=7) + + +def test_is_data_stale__invalid_staleness_period_with_refresh_day(): + with pytest.raises(ValueError): + is_data_stale("dummy_table", dt_interval=DTInterval(interval="1 month"), refresh_day_num=5) diff --git a/tests/utils/test_utils.py b/tests/utils/test_common_utils.py similarity index 100% rename from tests/utils/test_utils.py rename to tests/utils/test_common_utils.py diff --git a/tests/utils/test_date_time_utils.py b/tests/utils/test_date_time_utils.py new file mode 100644 index 00000000..ee8141b9 --- /dev/null +++ b/tests/utils/test_date_time_utils.py @@ -0,0 +1,55 @@ +from datetime import timedelta + +import pytest + +from koheesio.utils.date_time import DTInterval + + +@pytest.mark.parametrize( + "input,expected", + [ + ( + # check with all values + "5 years 3 months 2 weeks 4 days 10 hours 42 minutes 4 seconds 20 milliseconds 200 microseconds", + timedelta( + days=int(4 + 14 + 3 * 30.44 + 5 * 365.25), + seconds=4, + microseconds=200, + milliseconds=20, + minutes=42, + hours=10, + ), + ), + ( + # check with aliases + "7 years 4 months 1 weeks 3 days 8 hours 15 mins 30 secs 50 millis 100 micros", + timedelta( + days=int(3 + 7 + 4 * 30.44 + 7 * 365.25), + seconds=30, + microseconds=100, + milliseconds=50, + minutes=15, + hours=8, + ), + ), + ( + # check with singular values + "1 year 1 month 1 week 1 day 1 hour 1 minute 1 second 1 millisecond 1 microsecond", + timedelta(days=395 + 8, seconds=1, microseconds=1, milliseconds=1, minutes=1, hours=1), + ), + ( + # check with singular alias values + "1 min 1 sec 1 milli 1 micro", + timedelta(seconds=1, microseconds=1, milliseconds=1, minutes=1), + ), + ( + # test with week value + "1 week", + timedelta(weeks=1), + ), + ("5 years", timedelta(days=int(365.25 * 5))), + ], +) +def test_dt_interval(input, expected): + dt_interval = DTInterval(interval=input) + assert dt_interval.to_timedelta == expected From 1c5ac9cb08cee8871f3eb042d6a10e9c84c13f5e Mon Sep 17 00:00:00 2001 From: zarembat Date: Fri, 31 Jan 2025 16:58:56 +0100 Subject: [PATCH 2/6] Added some examples to the docstrings, used the exists() method inside describe_history() and added some log messages for debugging --- src/koheesio/spark/delta/__init__.py | 20 ++------- src/koheesio/spark/delta/utils.py | 42 +++++++++++++++++- src/koheesio/utils/date_time.py | 63 +++++++++++++++++++++++---- tests/spark/delta/test_delta_utils.py | 4 +- 4 files changed, 100 insertions(+), 29 deletions(-) diff --git a/src/koheesio/spark/delta/__init__.py b/src/koheesio/spark/delta/__init__.py index e11fb349..eb527192 100644 --- a/src/koheesio/spark/delta/__init__.py +++ b/src/koheesio/spark/delta/__init__.py @@ -307,16 +307,8 @@ def exists(self) -> bool: result = True except AnalysisException as e: err_msg = str(e).lower() - common_message = ( - f"Table `{self.table}` doesn't exist. " - f"The `create_if_not_exists` flag is set to {self.create_if_not_exists}." - ) - if err_msg.startswith("[table_or_view_not_found]") or err_msg.startswith("table or view not found"): - if self.create_if_not_exists: - self.log.info(" ".join((common_message, "Therefore the table will be created."))) - else: - self.log.error(" ".join((common_message, "Therefore the table will not be created."))) + self.log.debug(f"Table `{self.table_name}` does not exist.") else: raise e @@ -349,15 +341,11 @@ def describe_history(self, limit: Optional[int] = None) -> Optional[DataFrame]: ``` Would return the last 10 operations from the Delta Log. """ - try: + if self.exists: history_df = self.spark.sql(f"DESCRIBE HISTORY {self.table_name}") history_df = history_df.orderBy("version", ascending=False) if limit: history_df = history_df.limit(limit) return history_df - except AnalysisException as e: - err_msg = str(e).lower() - if err_msg.startswith("[table_or_view_not_found]"): - self.log.warning(f"The table or view {self.table_name} does not exist.") - else: - raise e + else: + self.log.warning(f"Table `{self.table_name}` does not exist.") diff --git a/src/koheesio/spark/delta/utils.py b/src/koheesio/spark/delta/utils.py index f621a823..ab90c2d0 100644 --- a/src/koheesio/spark/delta/utils.py +++ b/src/koheesio/spark/delta/utils.py @@ -3,9 +3,12 @@ """ from datetime import datetime, timedelta +from koheesio.logger import LoggingFactory from koheesio.spark.delta import DeltaTableStep from koheesio.utils.date_time import DTInterval +log = LoggingFactory.get_logger(name=__name__, inherit_from_koheesio=True) + def is_data_stale( table: DeltaTableStep, @@ -54,6 +57,35 @@ def is_data_stale( data should be refreshed if it has not already. Enforces a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds. + Examples + -------- + Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates + in the history are shown alongside the examples. + + Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold: + ``` + is_stale = is_data_stale(table_name, days=3) + print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days. + ``` + + Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold: + ``` + is_stale = is_data_stale(table_name, days=3, hours=1) + print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold. + ``` + + Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold: + ``` + is_stale = is_data_stale(table_name, days=3, hours=2) + print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. + ``` + + Example 4: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday): + ``` + is_stale = is_data_stale(table_name, days=5, refresh_day_num=5) + print(is_stale) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day. + ``` + Returns ------- bool @@ -69,6 +101,7 @@ def is_data_stale( if months > 0: # Convert months to days month_days = int(months * 30.44) # Average month length days += month_days + log.debug(f"Converted {months} months to {month_days} days.") staleness_period = ( timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) @@ -90,7 +123,8 @@ def is_data_stale( history_df = DeltaTableStep(table=table).describe_history() if not history_df: - return True # Consider data stale if no history exists + log.debug(f"No history found for `{table}`.") + return True # Consider data stale if the table does not exist modification_operations = [ "WRITE", @@ -110,16 +144,20 @@ def is_data_stale( last_modification = history_df.select("timestamp").first() if not last_modification: - return True # No modification operation found in the history + log.debug(f"No modification operation found in the history for `{table}`.") + return True last_modification_timestamp = last_modification["timestamp"] cut_off_date = current_time - staleness_period + log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}") + is_stale_by_time = last_modification_timestamp <= cut_off_date if refresh_day_num is not None: current_day_of_week = current_time.weekday() + log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {refresh_day_num}") is_appropriate_day_for_refresh = current_day_of_week == refresh_day_num return is_stale_by_time or is_appropriate_day_for_refresh diff --git a/src/koheesio/utils/date_time.py b/src/koheesio/utils/date_time.py index fc608128..6b6281ac 100644 --- a/src/koheesio/utils/date_time.py +++ b/src/koheesio/utils/date_time.py @@ -26,17 +26,62 @@ class DTInterval(BaseModel): """ A class to define date and time intervals using human-readable strings or individual time components. + + Parameters + ---------- + interval : str, optional + A human-readable string specifying the duration of the interval, broken down + into years, months, weeks, days, hours, minutes, seconds, milliseconds, and microseconds. + years : int, optional, default 0 + Number of years in the interval. + months : int, optional, default 0 + Number of months in the interval. + weeks : int, optional, default 0 + Number of weeks in the interval. + days : int, optional, default 0 + Number of days in the interval. + hours : int, optional, default 0 + Number of hours in the interval. + minutes : int, optional, default 0 + Number of minutes in the interval. + seconds : int, optional, default 0 + Number of seconds in the interval. + milliseconds : int, optional, default 0 + Number of milliseconds in the interval. + microseconds : int, optional, default 0 + Number of microseconds in the interval. + + Examples + -------- + Creating an instance with time components: + + ``` + print(DTInterval(years=2, weeks=3, hours=12).to_timedelta) + ``` + 751 days, 12:00:00 + + Creating an instance from a string: + ``` + print(DTInterval(interval="1 year 2 months 3 weeks 4 days 5 hours 100 minutes 200 seconds 300 milliseconds 400 microseconds").to_timedelta) + ``` + 451 days, 6:43:20.300400 + + Methods + ------- + to_timedelta() + Converts the DTInterval instance to a timedelta object, aggregating all specified time components. """ + interval: Optional[str] = None - years: int = 0 # = days * 365.25 (average year length, rounded up) - months: int = 0 # = days * 30.44 (average month length, rounded up) - weeks: int = 0 - days: int = 0 - hours: int = 0 - minutes: int = 0 - seconds: int = 0 - milliseconds: int = 0 - microseconds: int = 0 + years: Optional[int] = 0 # = days * 365.25 (average year length, rounded up) + months: Optional[int] = 0 # = days * 30.44 (average month length, rounded up) + weeks: Optional[int] = 0 + days: Optional[int] = 0 + hours: Optional[int] = 0 + minutes: Optional[int] = 0 + seconds: Optional[int] = 0 + milliseconds: Optional[int] = 0 + microseconds: Optional[int] = 0 @model_validator(mode="before") def process_interval(cls, values: dict) -> dict: diff --git a/tests/spark/delta/test_delta_utils.py b/tests/spark/delta/test_delta_utils.py index 7c82c866..b8d4eee1 100644 --- a/tests/spark/delta/test_delta_utils.py +++ b/tests/spark/delta/test_delta_utils.py @@ -1,10 +1,10 @@ from datetime import datetime -import pytest from freezegun import freeze_time +import pytest -from koheesio.utils.date_time import DTInterval from koheesio.spark.delta.utils import is_data_stale +from koheesio.utils.date_time import DTInterval @pytest.fixture From db36653cd7aae5e201d52a5e651a56ae3d5a53a5 Mon Sep 17 00:00:00 2001 From: zarembat Date: Tue, 4 Feb 2025 19:50:59 +0100 Subject: [PATCH 3/6] Addressed remarks from PR review: enhanced docstring, additional logging, fixed tests. --- src/koheesio/spark/delta/__init__.py | 2 +- src/koheesio/spark/delta/utils.py | 7 +++++++ src/koheesio/spark/writers/delta/batch.py | 6 +++++- tests/spark/delta/test_delta.py | 15 ++++----------- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/koheesio/spark/delta/__init__.py b/src/koheesio/spark/delta/__init__.py index eb527192..cda3e829 100644 --- a/src/koheesio/spark/delta/__init__.py +++ b/src/koheesio/spark/delta/__init__.py @@ -214,7 +214,7 @@ def _alter_table() -> None: if override: self.log.debug( f"Property `{key}` presents in `{self.table_name}` and has value `{persisted_properties[key]}`." - f"Override is enabled.The value will be changed to `{v_str}`." + f"Override is enabled. The value will be changed to `{v_str}`." ) _alter_table() else: diff --git a/src/koheesio/spark/delta/utils.py b/src/koheesio/spark/delta/utils.py index ab90c2d0..ed0a58bf 100644 --- a/src/koheesio/spark/delta/utils.py +++ b/src/koheesio/spark/delta/utils.py @@ -91,6 +91,13 @@ def is_data_stale( bool True if data is considered stale by exceeding the defined time limits or if the current day equals to `refresh_day_num`. Returns False if conditions are not met. + + Raises + ------ + ValueError + If neither time components nor `dt_interval` are provided. + If `refresh_day_num` is not between 0 and 6. + If the total period exceeds 7 days when `refresh_day_num` is set. """ if not any((months, weeks, days, hours, minutes, seconds)) and dt_interval is None: diff --git a/src/koheesio/spark/writers/delta/batch.py b/src/koheesio/spark/writers/delta/batch.py index 823ecbd1..f3d29cd4 100644 --- a/src/koheesio/spark/writers/delta/batch.py +++ b/src/koheesio/spark/writers/delta/batch.py @@ -385,7 +385,11 @@ def execute(self) -> Writer.Output: if self.table.create_if_not_exists and not self.table.exists: _writer = _writer.options(**self.table.default_create_properties) - + message = ( + f"Table `{self.table}` doesn't exist. The `create_if_not_exists` flag is set to True. " + "Therefore the table will be created." + ) + self.log.info(message) if isinstance(_writer, DeltaMergeBuilder) or type(_writer).__name__ == "DeltaMergeBuilder": _writer.execute() else: diff --git a/tests/spark/delta/test_delta.py b/tests/spark/delta/test_delta.py index 4d09ad53..cf352a14 100644 --- a/tests/spark/delta/test_delta.py +++ b/tests/spark/delta/test_delta.py @@ -156,7 +156,6 @@ def test_exists(caplog, table, create_if_not_exists, log_level): dt = DeltaTableStep(table=table, create_if_not_exists=create_if_not_exists) dt.log.setLevel(log_level) assert dt.exists is False - assert f"The `create_if_not_exists` flag is set to {create_if_not_exists}." in caplog.text @pytest.fixture @@ -170,6 +169,7 @@ def test_history_df(spark): def test_describe_history__no_limit(mocker, spark, test_history_df): + mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=True)) dt = DeltaTableStep(table="test_table") mocker.patch.object(spark, "sql", return_value=test_history_df) result = dt.describe_history() @@ -184,6 +184,7 @@ def test_describe_history__no_limit(mocker, spark, test_history_df): def test_describe_history__with_limit(mocker, spark, test_history_df): + mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=True)) dt = DeltaTableStep(table="test_table") mocker.patch.object(spark, "sql", return_value=test_history_df) result = dt.describe_history(limit=1) @@ -195,17 +196,9 @@ def test_describe_history__with_limit(mocker, spark, test_history_df): assert_df_equality(result, expected_df, ignore_column_order=True) -def test_describe_history__no_table(mocker, spark): +def test_describe_history__no_table(mocker): + mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=False)) dt = DeltaTableStep(table="test_table") - mocker.patch.object(spark, "sql", side_effect=AnalysisException("[TABLE_OR_VIEW_NOT_FOUND]")) result = dt.describe_history() assert result is None - - -def test_describe_history__error(mocker, spark): - dt = DeltaTableStep(table="test_table") - mocker.patch.object(spark, "sql", side_effect=AnalysisException("Some other error")) - - with pytest.raises(AnalysisException): - dt.describe_history() From f19c77eafe55f05779e0f5bbf111bcc95646e65f Mon Sep 17 00:00:00 2001 From: zarembat Date: Thu, 6 Feb 2025 08:49:20 +0100 Subject: [PATCH 4/6] Converted is_data_stale into a Step --- src/koheesio/spark/delta/utils.py | 189 +++++++++++++------------- tests/spark/delta/test_delta_utils.py | 134 ++++++------------ 2 files changed, 135 insertions(+), 188 deletions(-) diff --git a/src/koheesio/spark/delta/utils.py b/src/koheesio/spark/delta/utils.py index ed0a58bf..2b655b34 100644 --- a/src/koheesio/spark/delta/utils.py +++ b/src/koheesio/spark/delta/utils.py @@ -1,57 +1,34 @@ """ Utils for working with Delta tables. """ +from typing import Optional, Union from datetime import datetime, timedelta from koheesio.logger import LoggingFactory +from koheesio.models import Field, field_validator, model_validator from koheesio.spark.delta import DeltaTableStep -from koheesio.utils.date_time import DTInterval +from koheesio.steps import Step, StepOutput log = LoggingFactory.get_logger(name=__name__, inherit_from_koheesio=True) -def is_data_stale( - table: DeltaTableStep, - months: int = 0, - weeks: int = 0, - days: int = 0, - hours: int = 0, - minutes: int = 0, - seconds: int = 0, - dt_interval: DTInterval = None, - refresh_day_num: int = None, -) -> bool: +class StaleDataCheckStep(Step): """ - Determines if the data inside a table is stale based on the elapsed time since + Determines if the data inside the table is stale based on the elapsed time since the last modification and, optionally, based on the current week day. - The function allows specifying limits in terms of months, weeks, days, hours, minutes, and seconds to determine - how old data can be before it is considered stale. If `refresh_day_num` is provided, it adds an extra condition - to mark data as stale if the current day matches with the specified weekday. + The staleness interval is specified as a `timedelta` object. + If `refresh_day_num` is provided, it adds an extra condition to mark the data as stale if the current day matches with the specified weekday. - The last modification date is taken from the Delta Log. + The date of the last modification of the table is taken from the Delta Log. Parameters ---------- - table : str - The path to the table to check. - months : int, default 0 - Threshold in months to determine staleness. - weeks : int, default 0 - Threshold in weeks. - days : int, default 0 - Threshold in days. - hours : int, default 0 - Threshold in hours. - minutes : int, default 0 - Threshold in minutes. - seconds : int, default 0 - Threshold in seconds. - dt_interval : DTInterval, optional - An alternative to directly specifying time components. - This should be an instance of DTInterval, which provides - the `to_timedelta` method that converts structured time - descriptions into a timedelta object. + table : Union[DeltaTableStep, str] + The table to check for stale data. + interval : timedelta + The interval to consider data stale. Users can pass a `timedelta` object or an ISO-8601 compliant string representing the interval. + For example `P1W3DT2H30M` is equivalent to `timedelta(weeks=1, days=3, hours=2, minutes=30)`. refresh_day_num : int, optional The weekday number (0=Monday, 6=Sunday) on which data should be refreshed if it has not already. Enforces @@ -64,25 +41,31 @@ def is_data_stale( Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold: ``` - is_stale = is_data_stale(table_name, days=3) + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3)).execute().is_data_stale print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days. ``` Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold: ``` - is_stale = is_data_stale(table_name, days=3, hours=1) + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3, hours=1)).execute().is_data_stale print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold. ``` Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold: ``` - is_stale = is_data_stale(table_name, days=3, hours=2) + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3, hours=2)).execute().is_data_stale print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. ``` - Example 4: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday): + Example 4: Same as example 3 but with the interval defined as an ISO-8601 string: ``` - is_stale = is_data_stale(table_name, days=5, refresh_day_num=5) + is_stale = StaleDataCheckStep(table=table, interval="P3DT2H").execute().is_data_stale + print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. + ``` + + Example 5: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday): + ``` + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=5), refresh_day_num=5).execute().is_data_stale print(is_stale) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day. ``` @@ -95,78 +78,96 @@ def is_data_stale( Raises ------ ValueError - If neither time components nor `dt_interval` are provided. If `refresh_day_num` is not between 0 and 6. If the total period exceeds 7 days when `refresh_day_num` is set. """ - if not any((months, weeks, days, hours, minutes, seconds)) and dt_interval is None: - raise ValueError( - "You must provide either time components (months, weeks, days, hours, minutes, seconds) or dt_interval." - ) + table: Union[DeltaTableStep, str] = Field( + ..., + description="The table to check for stale data.", + ) + interval: timedelta = Field( + ..., + description="The interval to consider data stale.", + ) + refresh_day_num: Optional[int] = Field( + default=None, + description="The weekday number on which data should be refreshed.", + ) + + class Output(StepOutput): + """Output class for StaleDataCheckStep.""" - if months > 0: # Convert months to days - month_days = int(months * 30.44) # Average month length - days += month_days - log.debug(f"Converted {months} months to {month_days} days.") + is_data_stale: bool = Field(..., description="Boolean flag indicating whether data in the table is stale or not") - staleness_period = ( - timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) - if any((weeks, days, hours, minutes, seconds)) - else dt_interval.to_timedelta - ) + @field_validator("table") + def _validate_table(cls, table: Union[DeltaTableStep, str]) -> Union[DeltaTableStep, str]: + """Validate `table` value""" + if isinstance(table, str): + return DeltaTableStep(table=table) + return table + + @model_validator(mode="after") + def _validate_refresh_day_num(self) -> "StaleDataCheckStep": + """Validate input when `refresh_day_num` is provided.""" + if self.refresh_day_num is not None: + if not 0 <= self.refresh_day_num <= 6: + raise ValueError("refresh_day_num should be between 0 (Monday) and 6 (Sunday).") - if refresh_day_num is not None: - if not 0 <= refresh_day_num <= 6: - raise ValueError("refresh_day_num should be between 0 (Monday) and 6 (Sunday).") + max_period = timedelta(days=6, hours=23, minutes=59, seconds=59) + if self.interval > max_period: + raise ValueError("With refresh_day_num set, the total period must be less than 7 days.") - max_period = timedelta(days=6, hours=23, minutes=59, seconds=59) - if staleness_period > max_period: - raise ValueError("With refresh_day_num set, the total period must be less than 7 days.") + return self - current_time = datetime.now() + def execute(self) -> Output: - # Get the history of the Delta table - history_df = DeltaTableStep(table=table).describe_history() + # Get the history of the Delta table + history_df = self.table.describe_history() - if not history_df: - log.debug(f"No history found for `{table}`.") - return True # Consider data stale if the table does not exist + if not history_df: + log.debug(f"No history found for `{self.table.table_name}`.") + self.output.is_data_stale = True # Consider data stale if the table does not exist + return self.output - modification_operations = [ - "WRITE", - "MERGE", - "DELETE", - "UPDATE", - "REPLACE TABLE AS SELECT", - "CREATE TABLE AS SELECT", - "TRUNCATE", - "RESTORE", - ] + modification_operations = [ + "WRITE", + "MERGE", + "DELETE", + "UPDATE", + "REPLACE TABLE AS SELECT", + "CREATE TABLE AS SELECT", + "TRUNCATE", + "RESTORE", + ] - # Filter the history to data modification operations only - history_df = history_df.filter(history_df["operation"].isin(modification_operations)) + # Filter the history to data modification operations only + history_df = history_df.filter(history_df["operation"].isin(modification_operations)) - # Get the last modification operation's timestamp - last_modification = history_df.select("timestamp").first() + # Get the last modification operation's timestamp + last_modification = history_df.select("timestamp").first() - if not last_modification: - log.debug(f"No modification operation found in the history for `{table}`.") - return True + if not last_modification: + log.debug(f"No modification operation found in the history for `{self.table.table_name}`.") + self.output.is_data_stale = True + return self.output - last_modification_timestamp = last_modification["timestamp"] + current_time = datetime.now() + last_modification_timestamp = last_modification["timestamp"] - cut_off_date = current_time - staleness_period + cut_off_date = current_time - self.interval - log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}") + log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}") - is_stale_by_time = last_modification_timestamp <= cut_off_date + is_stale_by_time = last_modification_timestamp <= cut_off_date - if refresh_day_num is not None: - current_day_of_week = current_time.weekday() - log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {refresh_day_num}") + if self.refresh_day_num is not None: + current_day_of_week = current_time.weekday() + log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {self.refresh_day_num}") - is_appropriate_day_for_refresh = current_day_of_week == refresh_day_num - return is_stale_by_time or is_appropriate_day_for_refresh + is_appropriate_day_for_refresh = current_day_of_week == self.refresh_day_num + self.output.is_data_stale = is_stale_by_time or is_appropriate_day_for_refresh + return self.output - return is_stale_by_time + self.output.is_data_stale = is_stale_by_time + return self.output diff --git a/tests/spark/delta/test_delta_utils.py b/tests/spark/delta/test_delta_utils.py index b8d4eee1..641f9dbb 100644 --- a/tests/spark/delta/test_delta_utils.py +++ b/tests/spark/delta/test_delta_utils.py @@ -1,17 +1,20 @@ -from datetime import datetime +from datetime import datetime, timedelta from freezegun import freeze_time import pytest -from koheesio.spark.delta.utils import is_data_stale -from koheesio.utils.date_time import DTInterval +from koheesio.spark.delta.utils import StaleDataCheckStep @pytest.fixture def test_history_df(spark): return spark.createDataFrame( [ - {"timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=29, second=30), "operation": "WRITE", "version": 1}, + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=29, second=30), + "operation": "WRITE", + "version": 1 + }, { "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), "operation": "CREATE TABLE", @@ -22,88 +25,36 @@ def test_history_df(spark): @pytest.mark.parametrize( - "months, weeks, days, hours, minutes, seconds, expected", + "interval, expected", [ - (1, 0, 0, 0, 0, 0, False), # Not stale, since 1 month > 1 day and some hours ago - (0, 1, 0, 0, 0, 0, False), # Not stale, since 1 week > 1 day and some hours ago - (0, 0, 2, 0, 0, 0, False), # Not stale, since 2 days > 1 day and some hours ago - (0, 0, 1, 7, 0, 0, False), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes - (0, 0, 1, 6, 31, 0, False), # Not stale, since 1 minute more than the timestamp age - (0, 0, 1, 6, 30, 31, False), # Not stale, since 1 second more than the timestamp age - (0, 0, 1, 6, 30, 30, True), # Exactly equal to the age, should be considered stale - (0, 0, 1, 6, 30, 29, True), # Stale, falls 1 second short - (0, 0, 1, 0, 0, 0, True), # Stale, falls several hours and minutes short - ( - 0, - 0, - 0, - 18, - 0, - 0, - True, - ), # Stale, despite being more than half a day, but less than the full duration since the timestamp + (timedelta(weeks=1), False), # Not stale, since 1 week > 1 day and some hours ago + (timedelta(days=2), False), # Not stale, since 2 days > 1 day and some hours ago + (timedelta(days=1, hours=7), False), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes + (timedelta(days=1, hours=6, minutes=31), False), # Not stale, since 1 minute more than the timestamp age + (timedelta(days=1, hours=6, minutes=30, seconds=31), False), # Not stale, since 1 second more than the timestamp age + (timedelta(days=1, hours=6, minutes=30, seconds=30), True), # Exactly equal to the age, should be considered stale + (timedelta(days=1, hours=6, minutes=30, seconds=29), True), # Stale, falls 1 second short + (timedelta(days=1), True), # Stale, falls several hours and minutes short + (timedelta(hours=18), True) # Stale, despite being more than half a day, but less than the full duration since the timestamp ], ) @freeze_time("2024-12-31 12:00:00") -def test_is_data_stale__no_refresh_day_num_with_time_components( - months, weeks, days, hours, minutes, seconds, expected, test_history_df, mocker +def test_stale_data_check_step__no_refresh_day_num_with_time_components( + interval, expected, test_history_df, mocker ): - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) - assert ( - is_data_stale( - "dummy_table", months=months, weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds - ) + StaleDataCheckStep(table="dummy_table", interval=interval).execute().is_data_stale == expected ) -@pytest.mark.parametrize( - "dt_interval, expected", - [ - (DTInterval(interval="2 days"), False), # Not stale, since 2 days > 1 day and some hours ago - ( - DTInterval(interval="1 day, 7 hours"), - False, - ), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes - ( - DTInterval(interval="1 day, 6 hours, 31 minutes"), - False, - ), # Not stale, since 1 minute more than the timestamp age - ( - DTInterval(interval="1 day, 6 hours, 30 minutes, 31 seconds"), - False, - ), # Not stale, since 1 second more than the timestamp age - ( - DTInterval(interval="1 day, 6 hours, 30 minutes, 30 seconds"), - True, - ), # Exactly equal to the age, should be considered stale - (DTInterval(interval="1 day, 6 hours, 30 minutes, 29 seconds"), True), # Stale, falls 1 second short - (DTInterval(interval="1 day"), True), # Stale, falls several hours and minutes short - ( - DTInterval(interval="18 hours"), - True, - ), # Stale, despite being more than half a day, but less than the full duration since the timestamp - ], -) -@freeze_time("2024-12-31 12:00:00") -def test_is_data_stale__no_refresh_day_num_with_dt_interval(dt_interval, expected, test_history_df, mocker): - - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) - - assert is_data_stale("dummy_table", dt_interval=dt_interval) == expected - - -def test_is_data_stale__no_table(mocker): - +def test_stale_data_check_step__no_table(mocker): mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=None) - - assert is_data_stale("dummy_table", days=1) + assert StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1)).execute().is_data_stale -def test_is_data_stale__no_modification_history(mocker, spark): - +def test_stale_data_check_step__no_modification_history(mocker, spark): history_df = spark.createDataFrame( [ { @@ -112,61 +63,56 @@ def test_is_data_stale__no_modification_history(mocker, spark): } ] ) - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=history_df) - - assert is_data_stale("dummy_table", days=1) + assert StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1)).execute().is_data_stale @pytest.mark.parametrize( "interval, refresh_day_num, expected", [ ( - DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + timedelta(days=2, hours=6, minutes=30, seconds=30), 2, False, ), # Data is not stale and it's before the refresh day ( - DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + timedelta(days=2, hours=6, minutes=30, seconds=30), 1, True, ), # Data is not stale but it is the refresh day ( - DTInterval(interval="2 days, 6 hours, 30 minutes, 30 seconds"), + timedelta(days=2, hours=6, minutes=30, seconds=30), 0, False, ), # Data is not stale and it is past the refresh day ( - DTInterval(interval="6 hours, 30 minutes, 30 seconds"), + timedelta(hours=6, minutes=30, seconds=30), 2, True, ), # Data is stale and it's before the refresh day - (DTInterval(interval="6 hours, 30 minutes, 30 seconds"), 1, True), # Data is stale and it is the refresh day ( - DTInterval(interval="6 hours, 30 minutes, 30 seconds"), + timedelta(hours=6, minutes=30, seconds=30), + 1, + True + ), # Data is stale and it is the refresh day + ( + timedelta(hours=6, minutes=30, seconds=30), 0, True, ), # Data is stale and it is past the refresh day ], ) @freeze_time("2024-12-31 12:00:00") # Tuesday -def test_is_data_stale__with_refresh_day(interval, refresh_day_num, expected, test_history_df, mocker): - +def test_stale_data_check_step__with_refresh_day(interval, refresh_day_num, expected, test_history_df, mocker): mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) - - assert is_data_stale("dummy_table", dt_interval=interval, refresh_day_num=refresh_day_num) == expected - - -def test_is_data_stale__missing_input(): - with pytest.raises(ValueError): - is_data_stale("dummy_table") + assert StaleDataCheckStep(table="dummy_table", interval=interval, refresh_day_num=refresh_day_num).execute().is_data_stale == expected -def test_is_data_stale__invalid_refresh_day(): +def test_stale_data_check_step__invalid_refresh_day(): with pytest.raises(ValueError): - is_data_stale("dummy_table", dt_interval=DTInterval(interval="1 day"), refresh_day_num=7) + StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1), refresh_day_num=7).execute() -def test_is_data_stale__invalid_staleness_period_with_refresh_day(): +def test_stale_data_check_step__invalid_staleness_period_with_refresh_day(): with pytest.raises(ValueError): - is_data_stale("dummy_table", dt_interval=DTInterval(interval="1 month"), refresh_day_num=5) + StaleDataCheckStep(table="dummy_table", interval=timedelta(days=10), refresh_day_num=5).execute() From 514389251d71c1d3cf69cfb132b0bb414e3b6adf Mon Sep 17 00:00:00 2001 From: zarembat Date: Thu, 6 Feb 2025 13:21:39 +0100 Subject: [PATCH 5/6] Moved StaleDataCheckStep to spark/delta.py + minor improvements --- .../spark/{delta/__init__.py => delta.py} | 165 +++++++++++++++++ src/koheesio/spark/delta/utils.py | 173 ------------------ tests/spark/delta/test_delta.py | 130 ++++++++++++- tests/spark/delta/test_delta_utils.py | 118 ------------ 4 files changed, 286 insertions(+), 300 deletions(-) rename src/koheesio/spark/{delta/__init__.py => delta.py} (65%) delete mode 100644 src/koheesio/spark/delta/utils.py delete mode 100644 tests/spark/delta/test_delta_utils.py diff --git a/src/koheesio/spark/delta/__init__.py b/src/koheesio/spark/delta.py similarity index 65% rename from src/koheesio/spark/delta/__init__.py rename to src/koheesio/spark/delta.py index cda3e829..7d450aa1 100644 --- a/src/koheesio/spark/delta/__init__.py +++ b/src/koheesio/spark/delta.py @@ -3,15 +3,20 @@ """ from typing import Dict, List, Optional, Union +from datetime import datetime, timedelta import warnings from py4j.protocol import Py4JJavaError # type: ignore from pyspark.sql.types import DataType +from koheesio.logger import LoggingFactory from koheesio.models import Field, field_validator, model_validator from koheesio.spark import AnalysisException, DataFrame, SparkStep from koheesio.spark.utils import on_databricks +from koheesio.steps import Step, StepOutput + +log = LoggingFactory.get_logger(name=__name__, inherit_from_koheesio=True) class DeltaTableStep(SparkStep): @@ -349,3 +354,163 @@ def describe_history(self, limit: Optional[int] = None) -> Optional[DataFrame]: return history_df else: self.log.warning(f"Table `{self.table_name}` does not exist.") + + +class StaleDataCheckStep(Step): + """ + Determines if the data inside the Delta table is stale based on the elapsed time since + the last modification and, optionally, based on the current week day. + + The staleness interval is specified as a `timedelta` object. + If `refresh_day_num` is provided, it adds an extra condition to mark the data as stale if the current day matches with the specified weekday. + + The date of the last modification of the table is taken from the Delta Log. + + Parameters + ---------- + table : Union[DeltaTableStep, str] + The table to check for stale data. + interval : timedelta + The interval to consider data stale. Users can pass a `timedelta` object or an ISO-8601 compliant string representing the interval. + For example `P1W3DT2H30M` is equivalent to `timedelta(weeks=1, days=3, hours=2, minutes=30)`. + refresh_day_num : int, optional + The weekday number (0=Monday, 6=Sunday) on which + data should be refreshed if it has not already. Enforces + a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds. + + Examples + -------- + Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates + in the history are shown alongside the examples. + + Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold: + ``` + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3)).execute().is_data_stale + print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days. + ``` + + Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold: + ``` + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3, hours=1)).execute().is_data_stale + print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold. + ``` + + Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold: + ``` + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3, hours=2)).execute().is_data_stale + print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. + ``` + + Example 4: Same as example 3 but with the interval defined as an ISO-8601 string: + ``` + is_stale = StaleDataCheckStep(table=table, interval="P3DT2H").execute().is_data_stale + print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. + ``` + + Example 5: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday): + ``` + is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=5), refresh_day_num=5).execute().is_data_stale + print(is_stale) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day. + ``` + + Returns + ------- + bool + True if data is considered stale by exceeding the defined time limits or if the current + day equals to `refresh_day_num`. Returns False if conditions are not met. + + Raises + ------ + ValueError + If the total period exceeds 7 days when `refresh_day_num` is set. + ValidationError + If `refresh_day_num` is not between 0 and 6. + """ + + table: Union[DeltaTableStep, str] = Field( + ..., + description="The table to check for stale data.", + ) + interval: timedelta = Field( + ..., + description="The interval to consider data stale.", + ) + refresh_day_num: Optional[int] = Field( + default=None, + description="The weekday number on which data should be refreshed.", + ge=0, + le=6 + ) + + class Output(StepOutput): + """Output class for StaleDataCheckStep.""" + + is_data_stale: bool = Field(..., description="Boolean flag indicating whether data in the table is stale or not") + + @field_validator("table") + def _validate_table(cls, table: Union[DeltaTableStep, str]) -> Union[DeltaTableStep, str]: + """Validate `table` value""" + if isinstance(table, str): + return DeltaTableStep(table=table) + return table + + @model_validator(mode="after") + def _validate_refresh_day_num(self) -> "StaleDataCheckStep": + """Validate input when `refresh_day_num` is provided.""" + if self.refresh_day_num is not None: + max_period = timedelta(days=6, hours=23, minutes=59, seconds=59) + if self.interval > max_period: + raise ValueError("With refresh_day_num set, the total period must be less than 7 days.") + + return self + + def execute(self) -> Output: + + # Get the history of the Delta table + history_df = self.table.describe_history() + + if not history_df: + log.debug(f"No history found for `{self.table.table_name}`.") + self.output.is_data_stale = True # Consider data stale if the table does not exist + return self.output + + modification_operations = [ + "WRITE", + "MERGE", + "DELETE", + "UPDATE", + "REPLACE TABLE AS SELECT", + "CREATE TABLE AS SELECT", + "TRUNCATE", + "RESTORE", + ] + + # Filter the history to data modification operations only + history_df = history_df.filter(history_df["operation"].isin(modification_operations)) + + # Get the last modification operation's timestamp + last_modification = history_df.select("timestamp").first() + + if not last_modification: + log.debug(f"No modification operation found in the history for `{self.table.table_name}`.") + self.output.is_data_stale = True + return self.output + + current_time = datetime.now() + last_modification_timestamp = last_modification["timestamp"] + + cut_off_date = current_time - self.interval + + log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}") + + is_stale_by_time = last_modification_timestamp <= cut_off_date + + if self.refresh_day_num is not None: + current_day_of_week = current_time.weekday() + log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {self.refresh_day_num}") + + is_appropriate_day_for_refresh = current_day_of_week == self.refresh_day_num + self.output.is_data_stale = is_stale_by_time or is_appropriate_day_for_refresh + return self.output + + self.output.is_data_stale = is_stale_by_time diff --git a/src/koheesio/spark/delta/utils.py b/src/koheesio/spark/delta/utils.py deleted file mode 100644 index 2b655b34..00000000 --- a/src/koheesio/spark/delta/utils.py +++ /dev/null @@ -1,173 +0,0 @@ -""" -Utils for working with Delta tables. -""" -from typing import Optional, Union -from datetime import datetime, timedelta - -from koheesio.logger import LoggingFactory -from koheesio.models import Field, field_validator, model_validator -from koheesio.spark.delta import DeltaTableStep -from koheesio.steps import Step, StepOutput - -log = LoggingFactory.get_logger(name=__name__, inherit_from_koheesio=True) - - -class StaleDataCheckStep(Step): - """ - Determines if the data inside the table is stale based on the elapsed time since - the last modification and, optionally, based on the current week day. - - The staleness interval is specified as a `timedelta` object. - If `refresh_day_num` is provided, it adds an extra condition to mark the data as stale if the current day matches with the specified weekday. - - The date of the last modification of the table is taken from the Delta Log. - - Parameters - ---------- - table : Union[DeltaTableStep, str] - The table to check for stale data. - interval : timedelta - The interval to consider data stale. Users can pass a `timedelta` object or an ISO-8601 compliant string representing the interval. - For example `P1W3DT2H30M` is equivalent to `timedelta(weeks=1, days=3, hours=2, minutes=30)`. - refresh_day_num : int, optional - The weekday number (0=Monday, 6=Sunday) on which - data should be refreshed if it has not already. Enforces - a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds. - - Examples - -------- - Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates - in the history are shown alongside the examples. - - Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold: - ``` - is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3)).execute().is_data_stale - print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days. - ``` - - Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold: - ``` - is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3, hours=1)).execute().is_data_stale - print(is_stale) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold. - ``` - - Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold: - ``` - is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=3, hours=2)).execute().is_data_stale - print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. - ``` - - Example 4: Same as example 3 but with the interval defined as an ISO-8601 string: - ``` - is_stale = StaleDataCheckStep(table=table, interval="P3DT2H").execute().is_data_stale - print(is_stale) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours. - ``` - - Example 5: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday): - ``` - is_stale = StaleDataCheckStep(table=table, interval=timedelta(days=5), refresh_day_num=5).execute().is_data_stale - print(is_stale) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day. - ``` - - Returns - ------- - bool - True if data is considered stale by exceeding the defined time limits or if the current - day equals to `refresh_day_num`. Returns False if conditions are not met. - - Raises - ------ - ValueError - If `refresh_day_num` is not between 0 and 6. - If the total period exceeds 7 days when `refresh_day_num` is set. - """ - - table: Union[DeltaTableStep, str] = Field( - ..., - description="The table to check for stale data.", - ) - interval: timedelta = Field( - ..., - description="The interval to consider data stale.", - ) - refresh_day_num: Optional[int] = Field( - default=None, - description="The weekday number on which data should be refreshed.", - ) - - class Output(StepOutput): - """Output class for StaleDataCheckStep.""" - - is_data_stale: bool = Field(..., description="Boolean flag indicating whether data in the table is stale or not") - - @field_validator("table") - def _validate_table(cls, table: Union[DeltaTableStep, str]) -> Union[DeltaTableStep, str]: - """Validate `table` value""" - if isinstance(table, str): - return DeltaTableStep(table=table) - return table - - @model_validator(mode="after") - def _validate_refresh_day_num(self) -> "StaleDataCheckStep": - """Validate input when `refresh_day_num` is provided.""" - if self.refresh_day_num is not None: - if not 0 <= self.refresh_day_num <= 6: - raise ValueError("refresh_day_num should be between 0 (Monday) and 6 (Sunday).") - - max_period = timedelta(days=6, hours=23, minutes=59, seconds=59) - if self.interval > max_period: - raise ValueError("With refresh_day_num set, the total period must be less than 7 days.") - - return self - - def execute(self) -> Output: - - # Get the history of the Delta table - history_df = self.table.describe_history() - - if not history_df: - log.debug(f"No history found for `{self.table.table_name}`.") - self.output.is_data_stale = True # Consider data stale if the table does not exist - return self.output - - modification_operations = [ - "WRITE", - "MERGE", - "DELETE", - "UPDATE", - "REPLACE TABLE AS SELECT", - "CREATE TABLE AS SELECT", - "TRUNCATE", - "RESTORE", - ] - - # Filter the history to data modification operations only - history_df = history_df.filter(history_df["operation"].isin(modification_operations)) - - # Get the last modification operation's timestamp - last_modification = history_df.select("timestamp").first() - - if not last_modification: - log.debug(f"No modification operation found in the history for `{self.table.table_name}`.") - self.output.is_data_stale = True - return self.output - - current_time = datetime.now() - last_modification_timestamp = last_modification["timestamp"] - - cut_off_date = current_time - self.interval - - log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}") - - is_stale_by_time = last_modification_timestamp <= cut_off_date - - if self.refresh_day_num is not None: - current_day_of_week = current_time.weekday() - log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {self.refresh_day_num}") - - is_appropriate_day_for_refresh = current_day_of_week == self.refresh_day_num - self.output.is_data_stale = is_stale_by_time or is_appropriate_day_for_refresh - return self.output - - self.output.is_data_stale = is_stale_by_time - return self.output diff --git a/tests/spark/delta/test_delta.py b/tests/spark/delta/test_delta.py index cf352a14..c231c848 100644 --- a/tests/spark/delta/test_delta.py +++ b/tests/spark/delta/test_delta.py @@ -1,19 +1,19 @@ -import logging +from datetime import datetime, timedelta import os from pathlib import Path from unittest.mock import patch +from chispa import assert_df_equality from conftest import setup_test_data +from freezegun import freeze_time import pytest -from chispa import assert_df_equality from pydantic import ValidationError from pyspark.sql.types import LongType -from pyspark.sql.utils import AnalysisException from koheesio.logger import LoggingFactory -from koheesio.spark.delta import DeltaTableStep +from koheesio.spark.delta import DeltaTableStep, StaleDataCheckStep pytestmark = pytest.mark.spark @@ -159,7 +159,7 @@ def test_exists(caplog, table, create_if_not_exists, log_level): @pytest.fixture -def test_history_df(spark): +def test_describe_history_df(spark): data = [ {"version": 0, "timestamp": "2024-12-30 12:00:00", "tableName": "test_table", "operation": "CREATE TABLE"}, {"version": 1, "timestamp": "2024-12-31 05:29:30", "tableName": "test_table", "operation": "WRITE"}, @@ -168,10 +168,10 @@ def test_history_df(spark): return spark.createDataFrame(data) -def test_describe_history__no_limit(mocker, spark, test_history_df): +def test_describe_history__no_limit(mocker, spark, test_describe_history_df): mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=True)) dt = DeltaTableStep(table="test_table") - mocker.patch.object(spark, "sql", return_value=test_history_df) + mocker.patch.object(spark, "sql", return_value=test_describe_history_df) result = dt.describe_history() expected_df = spark.createDataFrame( [ @@ -183,10 +183,10 @@ def test_describe_history__no_limit(mocker, spark, test_history_df): assert_df_equality(result, expected_df, ignore_column_order=True) -def test_describe_history__with_limit(mocker, spark, test_history_df): +def test_describe_history__with_limit(mocker, spark, test_describe_history_df): mocker.patch.object(DeltaTableStep, "exists", new_callable=mocker.PropertyMock(return_value=True)) dt = DeltaTableStep(table="test_table") - mocker.patch.object(spark, "sql", return_value=test_history_df) + mocker.patch.object(spark, "sql", return_value=test_describe_history_df) result = dt.describe_history(limit=1) expected_df = spark.createDataFrame( [ @@ -202,3 +202,115 @@ def test_describe_history__no_table(mocker): result = dt.describe_history() assert result is None + + +@pytest.fixture +def test_stale_data_check_history_df(spark): + return spark.createDataFrame( + [ + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=29, second=30), + "operation": "WRITE", + "version": 1 + }, + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), + "operation": "CREATE TABLE", + "version": 0 + }, + ] + ) + + +@pytest.mark.parametrize( + "interval, expected", + [ + (timedelta(weeks=1), False), # Not stale, since 1 week > 1 day and some hours ago + (timedelta(days=2), False), # Not stale, since 2 days > 1 day and some hours ago + (timedelta(days=1, hours=7), False), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes + (timedelta(days=1, hours=6, minutes=31), False), # Not stale, since 1 minute more than the timestamp age + (timedelta(days=1, hours=6, minutes=30, seconds=31), False), # Not stale, since 1 second more than the timestamp age + (timedelta(days=1, hours=6, minutes=30, seconds=30), True), # Exactly equal to the age, should be considered stale + (timedelta(days=1, hours=6, minutes=30, seconds=29), True), # Stale, falls 1 second short + (timedelta(days=1), True), # Stale, falls several hours and minutes short + (timedelta(hours=18), True) # Stale, despite being more than half a day, but less than the full duration since the timestamp + ], +) +@freeze_time("2024-12-31 12:00:00") +def test_stale_data_check_step__no_refresh_day_num_with_time_components( + interval, expected, test_stale_data_check_history_df, mocker +): + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_stale_data_check_history_df) + assert ( + StaleDataCheckStep(table="dummy_table", interval=interval).execute().is_data_stale + == expected + ) + + +def test_stale_data_check_step__no_table(mocker): + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=None) + assert StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1)).execute().is_data_stale + + +def test_stale_data_check_step__no_modification_history(mocker, spark): + history_df = spark.createDataFrame( + [ + { + "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), + "operation": "CREATE TABLE", + } + ] + ) + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=history_df) + assert StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1)).execute().is_data_stale + + +@pytest.mark.parametrize( + "interval, refresh_day_num, expected", + [ + ( + timedelta(days=2, hours=6, minutes=30, seconds=30), + 2, + False, + ), # Data is not stale and it's before the refresh day + ( + timedelta(days=2, hours=6, minutes=30, seconds=30), + 1, + True, + ), # Data is not stale but it is the refresh day + ( + timedelta(days=2, hours=6, minutes=30, seconds=30), + 0, + False, + ), # Data is not stale and it is past the refresh day + ( + timedelta(hours=6, minutes=30, seconds=30), + 2, + True, + ), # Data is stale and it's before the refresh day + ( + timedelta(hours=6, minutes=30, seconds=30), + 1, + True + ), # Data is stale and it is the refresh day + ( + timedelta(hours=6, minutes=30, seconds=30), + 0, + True, + ), # Data is stale and it is past the refresh day + ], +) +@freeze_time("2024-12-31 12:00:00") # Tuesday +def test_stale_data_check_step__with_refresh_day(interval, refresh_day_num, expected, test_stale_data_check_history_df, mocker): + mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_stale_data_check_history_df) + assert StaleDataCheckStep(table="dummy_table", interval=interval, refresh_day_num=refresh_day_num).execute().is_data_stale == expected + + +def test_stale_data_check_step__invalid_refresh_day(): + with pytest.raises(ValueError): + StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1), refresh_day_num=7).execute() + + +def test_stale_data_check_step__invalid_staleness_period_with_refresh_day(): + with pytest.raises(ValueError): + StaleDataCheckStep(table="dummy_table", interval=timedelta(days=10), refresh_day_num=5).execute() diff --git a/tests/spark/delta/test_delta_utils.py b/tests/spark/delta/test_delta_utils.py deleted file mode 100644 index 641f9dbb..00000000 --- a/tests/spark/delta/test_delta_utils.py +++ /dev/null @@ -1,118 +0,0 @@ -from datetime import datetime, timedelta - -from freezegun import freeze_time -import pytest - -from koheesio.spark.delta.utils import StaleDataCheckStep - - -@pytest.fixture -def test_history_df(spark): - return spark.createDataFrame( - [ - { - "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=29, second=30), - "operation": "WRITE", - "version": 1 - }, - { - "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), - "operation": "CREATE TABLE", - "version": 0 - }, - ] - ) - - -@pytest.mark.parametrize( - "interval, expected", - [ - (timedelta(weeks=1), False), # Not stale, since 1 week > 1 day and some hours ago - (timedelta(days=2), False), # Not stale, since 2 days > 1 day and some hours ago - (timedelta(days=1, hours=7), False), # Not stale, since 1 day and 7 hours > 1 day, 6 hours, 30 minutes - (timedelta(days=1, hours=6, minutes=31), False), # Not stale, since 1 minute more than the timestamp age - (timedelta(days=1, hours=6, minutes=30, seconds=31), False), # Not stale, since 1 second more than the timestamp age - (timedelta(days=1, hours=6, minutes=30, seconds=30), True), # Exactly equal to the age, should be considered stale - (timedelta(days=1, hours=6, minutes=30, seconds=29), True), # Stale, falls 1 second short - (timedelta(days=1), True), # Stale, falls several hours and minutes short - (timedelta(hours=18), True) # Stale, despite being more than half a day, but less than the full duration since the timestamp - ], -) -@freeze_time("2024-12-31 12:00:00") -def test_stale_data_check_step__no_refresh_day_num_with_time_components( - interval, expected, test_history_df, mocker -): - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) - assert ( - StaleDataCheckStep(table="dummy_table", interval=interval).execute().is_data_stale - == expected - ) - - -def test_stale_data_check_step__no_table(mocker): - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=None) - assert StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1)).execute().is_data_stale - - -def test_stale_data_check_step__no_modification_history(mocker, spark): - history_df = spark.createDataFrame( - [ - { - "timestamp": datetime(year=2024, month=12, day=30, hour=5, minute=28, second=30), - "operation": "CREATE TABLE", - } - ] - ) - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=history_df) - assert StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1)).execute().is_data_stale - - -@pytest.mark.parametrize( - "interval, refresh_day_num, expected", - [ - ( - timedelta(days=2, hours=6, minutes=30, seconds=30), - 2, - False, - ), # Data is not stale and it's before the refresh day - ( - timedelta(days=2, hours=6, minutes=30, seconds=30), - 1, - True, - ), # Data is not stale but it is the refresh day - ( - timedelta(days=2, hours=6, minutes=30, seconds=30), - 0, - False, - ), # Data is not stale and it is past the refresh day - ( - timedelta(hours=6, minutes=30, seconds=30), - 2, - True, - ), # Data is stale and it's before the refresh day - ( - timedelta(hours=6, minutes=30, seconds=30), - 1, - True - ), # Data is stale and it is the refresh day - ( - timedelta(hours=6, minutes=30, seconds=30), - 0, - True, - ), # Data is stale and it is past the refresh day - ], -) -@freeze_time("2024-12-31 12:00:00") # Tuesday -def test_stale_data_check_step__with_refresh_day(interval, refresh_day_num, expected, test_history_df, mocker): - mocker.patch("koheesio.spark.delta.DeltaTableStep.describe_history", return_value=test_history_df) - assert StaleDataCheckStep(table="dummy_table", interval=interval, refresh_day_num=refresh_day_num).execute().is_data_stale == expected - - -def test_stale_data_check_step__invalid_refresh_day(): - with pytest.raises(ValueError): - StaleDataCheckStep(table="dummy_table", interval=timedelta(days=1), refresh_day_num=7).execute() - - -def test_stale_data_check_step__invalid_staleness_period_with_refresh_day(): - with pytest.raises(ValueError): - StaleDataCheckStep(table="dummy_table", interval=timedelta(days=10), refresh_day_num=5).execute() From 64527f211e37e5baf509454b092b6aefd8cf6008 Mon Sep 17 00:00:00 2001 From: zarembat Date: Thu, 6 Feb 2025 13:24:00 +0100 Subject: [PATCH 6/6] Restored test_delta.py --- tests/spark/{delta => }/test_delta.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/spark/{delta => }/test_delta.py (100%) diff --git a/tests/spark/delta/test_delta.py b/tests/spark/test_delta.py similarity index 100% rename from tests/spark/delta/test_delta.py rename to tests/spark/test_delta.py