Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1001: [Lake][ETL] Implement incremental ETL pipeline #1423

Merged
merged 85 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
b21eecf
pseudo code for insert + update routine. also provided example for br…
idiom-bytes May 29, 2024
9f6597d
I don't know why this didnt get included/pushed
idiom-bytes May 30, 2024
5abf219
First pass at going from pseudo code to something that should be clos…
idiom-bytes Jun 5, 2024
3d7d5d2
cleaning up imports, logic, and wiring through the new ETL sequence s…
idiom-bytes Jun 7, 2024
931beeb
making prediction query complete
idiom-bytes Jun 7, 2024
a0e3f0d
fixed raw_predictions query
idiom-bytes Jun 10, 2024
9e92496
First pass at integrating payouts and using whatever good data is ava…
idiom-bytes Jun 10, 2024
f496452
Implemented routine of updating new and historical records.
idiom-bytes Jun 10, 2024
61ee603
Getting query to complete, there are some join issues manifesing them…
idiom-bytes Jun 10, 2024
d23a310
started debugging etl and fixing up the code. both predictions and pa…
idiom-bytes Jun 10, 2024
83e493a
etl is completing end-to-end on the first run, as a result we have ne…
idiom-bytes Jun 10, 2024
dc79dc1
cleaning up various logs and prints now that the pipeline is stable s…
idiom-bytes Jun 11, 2024
70f618f
Removed a ton of logs that are making the data pipeline impossible to…
idiom-bytes Jun 11, 2024
8d80db7
cleaning up more logs and debugging issues
idiom-bytes Jun 11, 2024
1924832
started to update tests so we can have a verifiable test.
idiom-bytes Jun 12, 2024
2d578ff
Updated etl such that it enforces null values, allowing me to verify …
idiom-bytes Jun 12, 2024
e11a46d
Fixing up validation output
idiom-bytes Jun 12, 2024
5b71921
Exported test data for a more thorough fixture
idiom-bytes Jun 12, 2024
bebf68c
Got the first test setup, loading csvs into duckdb for ETL, and valid…
idiom-bytes Jun 12, 2024
9a35e50
hooking up sample_data to new test_etl flows. added some basic assers…
idiom-bytes Jun 12, 2024
cebb29f
Added asserts to more easily visualize the etl work and output
idiom-bytes Jun 12, 2024
f53e569
Added clamp, so ETL is forced to read from ppss rather than do it's i…
idiom-bytes Jun 13, 2024
de9842b
Adjusted test to cover 1/4 of the data, and reviwed implementation to…
idiom-bytes Jun 13, 2024
c20a86f
first pass on e2e incremental ttest
idiom-bytes Jun 13, 2024
4cffc1a
adding a bit of structure to the test so it's easier to manage
idiom-bytes Jun 13, 2024
3adef89
cleaned up test so I can more easily validate issues happening in step2
idiom-bytes Jun 13, 2024
291bc6f
small comments
idiom-bytes Jun 13, 2024
80681a7
restructured the swap strategy so that it can complete executing step…
idiom-bytes Jun 17, 2024
531657f
many null rows addressed but now it's generating extra valid payouts
idiom-bytes Jun 18, 2024
5d8bd8b
getting nearly the exact output that i'm expecting
idiom-bytes Jun 18, 2024
a6ec93e
removing temp artifacts that im usinig to debug output
idiom-bytes Jun 18, 2024
ded68dc
couple tweaks to improve test legibility, and to reduce spam in logs
idiom-bytes Jun 18, 2024
d26849d
st_ts and end_ts are now including all records, both tests return the…
idiom-bytes Jun 18, 2024
154f5e0
Created function that generates atomic swap transaction and executes …
idiom-bytes Jun 18, 2024
9bb6669
disabled old etl tests and verified everything is running e2e
idiom-bytes Jun 18, 2024
2e844f8
adjusting less than greater
idiom-bytes Jun 20, 2024
1c11b4a
Fixing up tests, cleaned up names, and refactor a large chunk of the …
idiom-bytes Jun 25, 2024
0ee3b9b
fixing up tests
idiom-bytes Jun 25, 2024
7988ce1
Cleaning up...
idiom-bytes Jun 25, 2024
c941a81
Fixing duplicate issues, I believe I found one of them but discovered…
idiom-bytes Jun 25, 2024
c06c1e7
updating tests to improve coverage/tests/validations
idiom-bytes Jun 26, 2024
d37c64f
fixing black
idiom-bytes Jun 26, 2024
25b0401
added logic in etl to identify first_run, and address _gte or _gt con…
idiom-bytes Jun 27, 2024
db78f5d
Now, pushed fix for duplicate
idiom-bytes Jun 27, 2024
2846d6d
Added unmatched payout validation
idiom-bytes Jun 28, 2024
e9773c5
fixing violations, also changed it so that the validation is done aga…
idiom-bytes Jun 28, 2024
ce3106c
updating cli docs
idiom-bytes Jun 28, 2024
b90779a
Updated documentation, added html/mermaid diagram so we can maintain …
idiom-bytes Jul 8, 2024
673b500
Updating documentation and integrating visuals
idiom-bytes Jul 10, 2024
f5fe8c5
formatting
idiom-bytes Jul 10, 2024
8bbad35
fixing hanging issues from cherrypick
idiom-bytes Jul 19, 2024
f58992f
again, fixing hanging issues from cherrypick
idiom-bytes Jul 19, 2024
9af8a16
getting all tests to pass
idiom-bytes Jul 19, 2024
074fa6e
getting tests to pass, fixed linter, formatter, etc..
idiom-bytes Jul 19, 2024
84e498a
added if inversion to make block easier to read
idiom-bytes Jul 19, 2024
99f831d
fixing formatting
idiom-bytes Jul 19, 2024
7ca76c8
changin is_etl_table to use a loop
idiom-bytes Jul 19, 2024
f9ecd98
fixed black formatting
idiom-bytes Jul 19, 2024
df06672
fixing mypy
idiom-bytes Jul 19, 2024
f254009
renamed drop_from_table_using_reference_table_ids to drop_common_reco…
idiom-bytes Jul 23, 2024
067130b
improved function names and removed hanging print statements
idiom-bytes Jul 23, 2024
a08cd67
cleaned up table_name function
idiom-bytes Jul 23, 2024
386e408
Appended no_unmatched_payouts violation
idiom-bytes Jul 23, 2024
416e8b5
reintroduced helper text for lake, and moved around cli_lake function…
idiom-bytes Jul 23, 2024
7d580f7
reverting ppss.yaml
idiom-bytes Jul 23, 2024
827e666
fixing is_etl_table logic to use enum values
idiom-bytes Jul 23, 2024
4c455e8
cleaning up old prints and comments
idiom-bytes Jul 24, 2024
cd7c65c
implementing calinas feedback
idiom-bytes Jul 24, 2024
ade35ce
fixing formatting
idiom-bytes Jul 24, 2024
0c4dd54
Merge branch 'main' into issue1001-incremental-etl
idiom-bytes Jul 24, 2024
8549a2d
Merge branch 'main' into issue1001-incremental-etl
calina-c Jul 24, 2024
0bcc67c
fixing small issues that are keeping lake from running correctly
idiom-bytes Jul 25, 2024
5a5b373
fix black
idiom-bytes Jul 25, 2024
1850174
Merge branch 'main' into issue1001-incremental-etl
idiom-bytes Jul 25, 2024
d2b82f5
fixing pylint
idiom-bytes Jul 25, 2024
02bbce5
Merge branch 'issue1457-fix-lake' into issue1001-incremental-etl
idiom-bytes Jul 25, 2024
37a9bed
Merge branch 'main' into issue1457-fix-lake
idiom-bytes Jul 25, 2024
32e0e69
reverting check_network issue
idiom-bytes Jul 25, 2024
ad2a2ac
Merge branch 'main' into issue1001-incremental-etl
idiom-bytes Jul 25, 2024
b5feb0e
Merge branch 'issue1457-fix-lake' into issue1001-incremental-etl
idiom-bytes Jul 25, 2024
d4e671a
there was an extra row_count
idiom-bytes Jul 25, 2024
e151362
fixing formatting
idiom-bytes Jul 25, 2024
ebb8bc5
fixing truevalues in payouts
idiom-bytes Jul 29, 2024
a98433c
fixing mock data
idiom-bytes Jul 29, 2024
2e0b0ea
getting PR up-to-date with main
idiom-bytes Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions READMEs/diagrams/lake.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<html>
<head>
<title>ETL Architecture</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
background-color: #f0f0f0;
margin: 40px;
}

.banner {
font-size: 24px;
margin-bottom: 20px;
}

.pink {
color: rgb(255, 64, 146);
}
</style>
<script src="https://cdn.jsdelivr.net/npm/mermaid@8/dist/mermaid.min.js"></script>
<script>
mermaid.initialize({
startOnLoad: true,
theme: 'forest',
});
</script>
</head>
<body>
<div class="banner">
Predict<span class="pink">oo</span>r Tables & Lake
</div>
<div class="mermaid">
erDiagram
SUBGRAPH_predictPredictions ||--o{ PDR_PREDICTIONS : yields
SUBGRAPH_predictPayouts ||--o{ PDR_PAYOUTS : yields
PDR_PREDICTIONS ||--o{ BRONZE_PDR_PREDICTIONS : yields
PDR_PAYOUTS ||--o{ BRONZE_PDR_PREDICTIONS : yields
PDR_PREDICTIONS {
string ID
string contract
int slot
string user
string pair
string timeframe
string source
int timestamp
}
PDR_PAYOUTS {
string ID
string contract
int slot
string user
boolean predvalue
float stake
float payout
int timestamp
}
BRONZE_PDR_PREDICTIONS {
string ID
string slot_id
string contract
int slot
string user
string pair
string timeframe
string source
boolean predvalue
float stake
float payout
int timestamp
int last_event_timestamp
}
</div>
</body>
</html>
Binary file added READMEs/images/etl_checkpoints.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added READMEs/images/gql_data_factory_fetch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added READMEs/images/gql_use_cache_and_swap.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified READMEs/images/lake_tables_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
275 changes: 126 additions & 149 deletions READMEs/lake-and-etl.md

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pdr_backend/analytics/get_predictions_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
plot_traction_daily_statistics,
)
from pdr_backend.lake.duckdb_data_store import DuckDBDataStore
from pdr_backend.lake.table import NamedTable
from pdr_backend.lake.table import Table
from pdr_backend.ppss.ppss import PPSS
from pdr_backend.util.time_types import UnixTimeMs

Expand Down Expand Up @@ -45,7 +45,7 @@ def get_predictions_info_main(
logger.info("get_predictions_info_main start_timestr %s", start_timestr)
logger.info("get_predictions_info_main end_timestr %s", end_timestr)

table_name = NamedTable("pdr_predictions").fullname
table_name = Table("pdr_predictions").table_name

# convert feed addresses to string for SQL query
feed_addrs_str = _address_list_to_str(feed_addrs)
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_predictoors_info_main(
logger.info(
"get_predictoors_info_main_ppss.lake_ss.lake_dir--- %s", ppss.lake_ss.lake_dir
)
table_name = NamedTable("pdr_predictions").fullname
table_name = Table("pdr_predictions").table_name

# convert feed addresses to string for SQL query
pdr_addrs_str = _address_list_to_str(pdr_addrs)
Expand All @@ -98,7 +98,7 @@ def get_predictoors_info_main(

@enforce_types
def get_traction_info_main(ppss: PPSS, start_timestr: str, end_timestr: str):
table_name = NamedTable("pdr_predictions").fullname
table_name = Table("pdr_predictions").table_name

query = f"""
SELECT *,
Expand Down
10 changes: 5 additions & 5 deletions pdr_backend/analytics/test/test_get_predictions_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from enforce_typing import enforce_types
from pdr_backend.analytics.get_predictions_info import get_predictions_info_main
from pdr_backend.lake.prediction import Prediction
from pdr_backend.lake.table import NamedTable
from pdr_backend.lake.table import Table
from pdr_backend.ppss.ppss import mock_ppss


Expand All @@ -34,7 +34,7 @@ def test_get_predictions_info_main_mainnet(
fin_timestr=fin_timestr,
)
predictions_df = _gql_datafactory_first_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

feed_addr = "0x2d8e2267779d27c2b3ed5408408ff15d9f3a3152"
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_get_predictions_info_bad_date_range(
)

predictions_df = _gql_datafactory_first_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

feed_addr = "0x2d8e2267779d27c2b3ed5408408ff15d9f3a3152"
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_get_predictions_info_bad_feed(
)

predictions_df = _gql_datafactory_first_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

feed_addr = "0x8e0we267779d27c2b3ed5408408ff15d9f3a3152"
Expand Down Expand Up @@ -196,7 +196,7 @@ def test_get_predictions_info_empty(_gql_datafactory_first_predictions_df, tmpdi
fin_timestr=fin_timestr,
)

predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(
pl.DataFrame([], schema=Prediction.get_lake_schema()), ppss
)
Expand Down
8 changes: 4 additions & 4 deletions pdr_backend/analytics/test/test_get_predictoors_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pdr_backend.analytics.get_predictions_info import get_predictoors_info_main
from pdr_backend.lake.prediction import Prediction
from pdr_backend.lake.table import NamedTable
from pdr_backend.lake.table import Table
from pdr_backend.ppss.ppss import mock_ppss


Expand All @@ -32,7 +32,7 @@ def test_get_predictoors_info_main_mainnet(
)

predictions_df = _gql_datafactory_first_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

user_addr = "0xaaaa4cb4ff2584bad80ff5f109034a891c3d88dd"
Expand Down Expand Up @@ -84,7 +84,7 @@ def test_get_predictoors_info_bad_date_range(
)

predictions_df = _gql_datafactory_first_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

user_addr = "0xaaaa4cb4ff2584bad80ff5f109034a891c3d88dd"
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_get_predictoors_info_bad_user_address(
)

predictions_df = _gql_datafactory_first_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

user_addr = "0xbbbb4cb4ff2584bad80ff5f109034a891c3d223"
Expand Down
6 changes: 3 additions & 3 deletions pdr_backend/analytics/test/test_get_traction_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pdr_backend.analytics.get_predictions_info import get_traction_info_main
from pdr_backend.lake.prediction import Prediction
from pdr_backend.lake.table import NamedTable
from pdr_backend.lake.table import Table
from pdr_backend.ppss.ppss import mock_ppss


Expand Down Expand Up @@ -40,7 +40,7 @@ def test_get_traction_info_main_mainnet(
)

predictions_df = _gql_datafactory_daily_predictions_df
predictions_table = NamedTable.from_dataclass(Prediction)
predictions_table = Table.from_dataclass(Prediction)
predictions_table.append_to_storage(predictions_df, ppss)

get_traction_info_main(ppss, st_timestr, fin_timestr)
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_get_traction_info_empty_data(
fin_timestr=fin_timestr,
)

pdr_prediction_table = NamedTable.from_dataclass(Prediction)
pdr_prediction_table = Table.from_dataclass(Prediction)
pdr_prediction_table.append_to_storage(
pl.DataFrame([], schema=Prediction.get_lake_schema()), ppss
)
Expand Down
2 changes: 0 additions & 2 deletions pdr_backend/cli/cli_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
pdr arima_plots PPSS_FILE [--debug_mode False]
pdr deployer (for >1 predictoor bots)
pdr lake raw|etl update PPSS_FILE NETWORK
pdr lake raw|etl drop PPSS_FILE NETWORK ST
pdr lake describe --HTML PPSS_FILE NETWORK
pdr lake validate PPSS_FILE NETWORK
pdr analytics PPSS_FILE NETWORK

Utilities:
Expand Down
1 change: 1 addition & 0 deletions pdr_backend/cli/cli_module_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def do_lake_query(args, ppss):
try:
df = db.query_data(args.QUERY)
print(df)
print("Rows:", len(df))
except Exception as e:
logger.error("Error querying lake: %s", e)
print(e)
Expand Down
16 changes: 9 additions & 7 deletions pdr_backend/cli/test/test_cli_module_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ def test_do_lake_raw_drop(tmpdir, caplog):

assert "drop table _temp_test1 starting at 1609459200000" in caplog.text
assert "rows before: 5" in caplog.text
assert "rows after: 2" in caplog.text
assert "rows after: 3" in caplog.text
assert "drop table test2 starting at 1609459200000" in caplog.text
assert "rows before: 5" in caplog.text
assert "rows after: 3" in caplog.text
assert "truncated 5 rows from 2 tables" in caplog.text
assert "truncated 8 rows from 3 tables" in caplog.text


@enforce_types
Expand All @@ -193,8 +193,8 @@ def test_do_lake_etl_drop(tmpdir, caplog):

db = DuckDBDataStore(str(tmpdir))
_make_and_fill_timestamps(db, "_temp_bronze_test1", ts - 3 * one_day)
_make_and_fill_timestamps(db, "_etl_silver_test2", ts - 2 * one_day)
_make_and_fill_timestamps(db, "_etl_test_raw", ts - 2 * one_day)
_make_and_fill_timestamps(db, "_new_events_silver_test2", ts - 2 * one_day)
_make_and_fill_timestamps(db, "_unknown_test_raw", ts - 2 * one_day)

mock_ppss = Mock()

Expand All @@ -203,11 +203,13 @@ def test_do_lake_etl_drop(tmpdir, caplog):

assert "drop table _temp_bronze_test1 starting at 1609459200000" in caplog.text
assert "rows before: 5" in caplog.text
assert "rows after: 2" in caplog.text
assert "drop table _etl_silver_test2 starting at 1609459200000" in caplog.text
assert "rows after: 3" in caplog.text
assert (
"drop table _new_events_silver_test2 starting at 1609459200000" in caplog.text
)
assert "rows before: 5" in caplog.text
assert "rows after: 3" in caplog.text
assert "skipping non-etl table _etl_test_raw" in caplog.text
assert "skipping non-etl table _unknown_test_raw" in caplog.text
assert "truncated 5 rows from 2 tables" in caplog.text


Expand Down
3 changes: 2 additions & 1 deletion pdr_backend/lake/csv_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(self, base_path: str, table_name: str):

@staticmethod
def from_table(table, ppss):
return CSVDataStore(ppss.lake_ss.lake_dir, table.table_name)
return CSVDataStore(ppss.lake_ss.lake_dir, table._base_table_name)

@enforce_types
def _create_file_name(self, start_time: int, end_time: Optional[int]) -> str:
Expand Down Expand Up @@ -254,6 +254,7 @@ def _append_remaining_rows(

remaining_data = data.slice(0, remaining_rows)
last_file_path = self._get_last_file_path()

last_file_data = pl.read_csv(last_file_path, schema=schema)
last_file_data = last_file_data.vstack(remaining_data).rechunk()

Expand Down
Loading
Loading