Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEGEN ETL Improvements [V1] #10

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
18a0634
support for start-block and file arg
manjeet9727 Apr 7, 2024
854cd2f
check messages in set before publishing
manjeet9727 Apr 7, 2024
6bacb79
support for bloom-filters
manjeet9727 Apr 7, 2024
e1d4a48
support for prometheus-metrics
manjeet9727 Apr 7, 2024
3a28809
change sorting order & minor redis changes
manjeet9727 Apr 8, 2024
42d8c6d
Merge branch 'main' of github.com:Airstack-xyz/ethereum-etl into feat…
manjeet9727 Apr 8, 2024
fc4c540
refactor code
manjeet9727 Apr 8, 2024
2f0298d
refactor code
manjeet9727 Apr 8, 2024
00d0f82
refactor code
manjeet9727 Apr 8, 2024
acf6122
Update setup.py
manjeet9727 Apr 8, 2024
5db3cd3
fix: lz4
manjeet9727 Apr 8, 2024
77871a3
Merge pull request #14 from Airstack-xyz/main
manjeet9727 Apr 8, 2024
2adc05b
Support for both start-block vs last-synced-file
manjeet9727 Apr 8, 2024
eb42f62
Support for both start-block vs last-synced-file
manjeet9727 Apr 9, 2024
acb31a8
Merge branch 'main' into feat/degen-etl-improvements
manjeet9727 Apr 9, 2024
be93dc5
Merge branch 'feat/degen-start-block' into feat/degen-etl-improvements
manjeet9727 Apr 9, 2024
9ce6504
WIP: CH support
manjeet9727 Apr 9, 2024
14bfff6
FIX: balance traces
manjeet9727 Apr 9, 2024
2b17ec7
FIX: balance traces
manjeet9727 Apr 9, 2024
42a87a9
FIX: balance traces
manjeet9727 Apr 9, 2024
c330081
Merge branch 'fix/traces' into feat/degen-etl-improvements
manjeet9727 Apr 9, 2024
ccdc5d2
process blocks at the end
manjeet9727 Apr 9, 2024
022e5c0
WIP: CH support
manjeet9727 Apr 9, 2024
52dfa13
WIP: CH support
manjeet9727 Apr 9, 2024
f622926
support for redis lock
manjeet9727 Apr 9, 2024
1d52c8e
support for redis lock
manjeet9727 Apr 10, 2024
a769547
WIP dedup logic
manjeet9727 Apr 10, 2024
5ee0a0b
Added support for dedup using CH
manjeet9727 Apr 10, 2024
c52eb78
Added support for dedup using CH
manjeet9727 Apr 10, 2024
d03e88c
Added support for dedup using CH
manjeet9727 Apr 10, 2024
975dea2
Added support for dedup using CH
manjeet9727 Apr 10, 2024
9e76bef
Added support for dedup using CH
manjeet9727 Apr 10, 2024
38fb76a
Added support for dedup using CH
manjeet9727 Apr 10, 2024
a2fffaf
Added support for dedup using CH
manjeet9727 Apr 10, 2024
4c99c0e
Added support for dedup using CH
manjeet9727 Apr 10, 2024
a82e57b
Added backfill jobs
manjeet9727 Apr 11, 2024
b71fb0f
Added backfill jobs
manjeet9727 Apr 11, 2024
89ee8a7
Merge pull request #15 from Airstack-xyz/feat/degen-start-block
0xsarvesh Apr 14, 2024
02e5b87
Added more logs
manjeet9727 Apr 14, 2024
b59df5f
make last-synced-block-file as required param
manjeet9727 Apr 15, 2024
d181885
fix: str traces value
manjeet9727 Apr 15, 2024
35fed03
Merge branch 'develop' into feat/degen-etl-improvements
manjeet9727 Apr 15, 2024
ebb3dd8
added details for EVM Transfers
manjeet9727 Apr 16, 2024
6d6b8df
code review fixes
manjeet9727 Apr 16, 2024
f90e705
code review fixes
manjeet9727 Apr 16, 2024
6e28885
revert: str traces value
manjeet9727 Apr 17, 2024
4f451f4
WIP: backmerge till ca54ef6
manjeet9727 Apr 23, 2024
e438604
fix backmerge
manjeet9727 Apr 24, 2024
d1c63f1
fix backmerge
manjeet9727 Apr 24, 2024
f62ca04
Merge branch 'fix/traces' into feat/degen-etl-improvements
manjeet9727 Apr 24, 2024
06cef7e
Merge branch 'feat/degen-etl-improvements' into feat/degen-v2-backmerge
manjeet9727 Apr 24, 2024
39c407f
Merge branch 'feat/degen-etl-improvements' into feat/degen-etl-improv…
manjeet9727 Apr 24, 2024
b157061
Merge branch 'feat/degen-v2-backmerge' into feat/etl-improvements-com…
manjeet9727 Apr 24, 2024
bf5899e
fix: add util to convert numric to string
manjeet9727 Apr 24, 2024
dbf7f2d
feat: produce to kafka and then mark in redis
manjeet9727 Apr 25, 2024
c60b1d4
update table mapping for token stream
manjeet9727 Apr 25, 2024
2f7b41c
Revert "WIP: backmerge till ca54ef6"
manjeet9727 Apr 25, 2024
ebf5079
fix: do not handle produce error
manjeet9727 Apr 28, 2024
94e2eb6
test: dont log produce message
manjeet9727 Apr 29, 2024
68fb8a3
wip: test futures handling
manjeet9727 Apr 30, 2024
ce4812a
feat: raise exception
manjeet9727 May 14, 2024
563197b
add AttributeError except for future
14MR May 20, 2024
5ebe998
filter out None values in futures
14MR May 20, 2024
0d1be65
move marking redis after kafka message was sent
14MR May 23, 2024
d4cea15
set acks='all', linger_ms=5
14MR May 28, 2024
a28ff29
Merge pull request #20 from Airstack-xyz/feat/etl-improvements-v2
14MR Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.9.6
32 changes: 24 additions & 8 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
from kafka import KafkaProducer

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter


from ethereumetl.deduplication.redis import RedisConnector

class KafkaItemExporter:

def __init__(self, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.redis = RedisConnector()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should a feature flag to turn on & off deduplicate logic.


self.connection_url = self.get_connection_url()
print(self.connection_url)
self.producer = KafkaProducer(
bootstrap_servers=self.connection_url,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=os.getenv('KAFKA_SCRAM_USERID'),
sasl_plain_password=os.getenv('KAFKA_SCRAM_PASSWORD'),
client_id=socket.gethostname(),
compression_type='lz4',
compression_type=os.environ.get('KAFKA_COMPRESSION', 'lz4'),
request_timeout_ms= 60000,
max_block_ms= 120000,
buffer_memory= 100000000)
Expand All @@ -38,16 +38,24 @@ def get_connection_url(self):
def open(self):
pass

def export_items(self, items):
def export_items(self, items):
for item in items:
self.export_item(item)

def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_topic_mapping:
item_id = item.get('id')

if item_id is not None and item_type is not None and item_type in self.item_type_to_topic_mapping:
item_type = self.item_type_to_topic_mapping[item_type]
data = json.dumps(item).encode('utf-8')
# logging.debug(data)
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)

if not self.already_processed(item_type, item_id):
logging.info(f'Processing message of Type=[{item_type}]; Id=[{item_id}]')
self.mark_processed(item_type, item_id)
return self.producer.send(item_type, value=data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling, write to logs that. Item has marked as processed but couldn't be produced.


logging.info(f'Message was already processed skipping... Type=[{item_type}]; Id=[{item_id}]')
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

Expand All @@ -56,9 +64,17 @@ def convert_items(self, items):
yield self.converter.convert_item(item)

def close(self):
self.redis.close()
pass

# utility function to set message as processed in Redis
def mark_processed(self, item_type, item_id):
return self.redis.add_to_set(item_type, item_id)

# utility functions to check message was already processed or not
def already_processed(self, item_type, item_id):
return self.redis.exists_in_set(item_type, item_id)

def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
Expand Down
15 changes: 4 additions & 11 deletions blockchainetl/streaming/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ def __init__(
self.prometheus_client = PrometheusConnector()

if self.mode == constants.RUN_MODE_NORMAL:
if self.start_block is not None or not os.path.isfile(self.last_synced_block_file):
init_last_synced_block_file((self.start_block or 0) - 1, self.last_synced_block_file)
# if "start-block" is provided and "last_synced_block_file" is not present
# then write "start-block - 1" to "last_synced_block_file"
if (self.start_block is not None) and (not os.path.isfile(self.last_synced_block_file)):
write_last_synced_block(self.last_synced_block_file, self.start_block - 1)

self.last_synced_block = read_last_synced_block(self.last_synced_block_file)

Expand Down Expand Up @@ -162,15 +164,6 @@ def write_last_synced_block(file, last_synced_block):
write_to_file(file, str(last_synced_block) + '\n')


def init_last_synced_block_file(start_block, last_synced_block_file):
if os.path.isfile(last_synced_block_file):
raise ValueError(
'{} should not exist if --start-block option is specified. '
'Either remove the {} file or the --start-block option.'
.format(last_synced_block_file, last_synced_block_file))
write_last_synced_block(last_synced_block_file, start_block)


def read_last_synced_block(file):
with smart_open(file, 'r') as last_synced_block_file:
return int(last_synced_block_file.read())
Expand Down
23 changes: 12 additions & 11 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.constants import constants

from prometheus_client import start_http_server

@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', show_default=True, type=str, help='')
@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', show_default=True, type=str, help='Stores last-synced-block-number and is used to continue sync in-case of restarts. If both "--last-synced-block-file" and "--start-block" are provided then block-number in "--last-synced-block-file" takes precedence')
@click.option('--lag', default=0, show_default=True, type=int, help='The number of blocks to lag behind the network.')
@click.option('-o', '--output', type=str,
help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; '
Expand All @@ -42,7 +44,7 @@
'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 '
'or Kinesis, e.g. kinesis://your-data-stream-name'
'If not specified will print to console')
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
@click.option('-s', '--start-block', required=True, type=int, help='Start block')
@click.option('-E', '--end-block', default=None, show_default=True, type=int, help='End block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
help='The list of entity types to export.')
Expand All @@ -63,16 +65,9 @@ def stream(last_synced_block_file, lag, output, start_block, end_block, entity_t

from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
from blockchainetl.streaming.streamer import Streamer

if os.environ['BLOCKCHAIN'] == None:
raise ValueError('BLOCKCHAIN env is missing')

check_required_envs()
provider_uri = os.environ['PROVIDER_URI']
if provider_uri == None:
raise ValueError('PROVIDER_URI env is missing')

if os.environ['KAFKA_BROKER_URI'] == None:
raise ValueError('KAFKA_BROKER_URI env is missing')

if mode == constants.RUN_MODE_CORRECTION:
blocks_to_reprocess = [int(block) for block in blocks_to_reprocess.split(',')]
Expand Down Expand Up @@ -105,9 +100,15 @@ def stream(last_synced_block_file, lag, output, start_block, end_block, entity_t
streamer.stream()


def check_required_envs():
for env in constants.REQUIRED_ENVS:
if os.environ[env] == None:
raise ValueError(f'{env} env is missing')


def parse_entity_types(entity_types):
entity_types = [c.strip() for c in entity_types.split(',')]

# validate passed types
for entity_type in entity_types:
if entity_type not in EntityType.ALL_FOR_STREAMING:
Expand Down
47 changes: 46 additions & 1 deletion ethereumetl/constants/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ethereumetl.enumeration.entity_type import EntityType

TOKEN_TYPE_ERC20 = 'ERC20'
TOKEN_TYPE_ERC721 = 'ERC721'
Expand Down Expand Up @@ -42,4 +43,48 @@
RUN_MODE_CORRECTION = 'correction'
RUN_MODE_NORMAL= 'normal'

METRICS_PORT = '9000'
# variables for deduplication
REDIS_PREFIX = 'etl'
METRICS_PORT = '9000'
CLICKHOUSE_FALLBACK_DAYS = '1'
CLICKHOUSE_QUERY_CHUNK_SIZE = 1500

REQUIRED_ENVS = [
# envs for kafka integration
'BLOCKCHAIN',
'PROVIDER_URI',
'KAFKA_BROKER_URI',

# envs for deduplication support
'REDIS_HOST',
'REDIS_PORT',
'REDIS_DB',
'REDIS_MESSAGE_TTL',
'CLICKHOUSE_HOST',
'CLICKHOUSE_PORT',
'CLICKHOUSE_USERNAME',
'CLICKHOUSE_PASSWORD',
'CLICKHOUSE_DATABASE'
]

ENTITY_TO_TABLE_MAP = {
EntityType.BLOCK: 'blocks',
EntityType.TRANSACTION: 'transactions',
EntityType.LOG: 'logs',
EntityType.TOKEN_TRANSFER: 'token_transfers',
EntityType.TRACE: 'traces',
EntityType.GETH_TRACE: 'traces',
EntityType.CONTRACT: 'contracts',
EntityType.TOKEN: 'tokens',
}

ENTITY_TO_TABLE_TS_COLUMNS_MAP = {
EntityType.BLOCK: 'timestamp',
EntityType.TOKEN: 'created_block_timestamp',
EntityType.TRANSACTION: 'block_timestamp',
EntityType.LOG: 'block_timestamp',
EntityType.TOKEN_TRANSFER: 'block_timestamp',
EntityType.TRACE: 'block_timestamp',
EntityType.GETH_TRACE: 'block_timestamp',
EntityType.CONTRACT: 'block_timestamp',
}
46 changes: 46 additions & 0 deletions ethereumetl/deduplication/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import clickhouse_connect
import logging

class Clickhouse:
"""
Clickhouse Connector
"""

def __init__(self):
"""
Connect to database and provide it's client
:param host:
:param port:
:param username:
:param password:
:param database:
"""
logging.debug('Connecting to clickhouse !!')

self._host = os.environ['CLICKHOUSE_HOST']
self._port = os.environ['CLICKHOUSE_PORT']
self._username = os.environ['CLICKHOUSE_USERNAME']
self._password = os.environ['CLICKHOUSE_PASSWORD']
self._database = os.environ['CLICKHOUSE_DATABASE']

logging.debug(
f'Making Connection to DB with host: {self._host}, port: {self._port}, database: {self._database}'
)
self._connection = clickhouse_connect.get_client(host=self._host, port=self._port, secure=True,
username=self._username, password=self._password, database=self._database)


async def run_query(self, query, parameters):
"""Function to run query on clickhouse

Args:
query (str): query to run
parameters (dict): variable parameters

Returns:
list: fetched data
"""
logging.debug(f'Running SQL Query {query}')
result = self._connection.query(query=query, parameters=parameters)
return result.result_rows
67 changes: 67 additions & 0 deletions ethereumetl/deduplication/deduplicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import logging
import asyncio
from datetime import datetime
from ethereumetl.constants import constants

def deduplicate_records(records, ts_key, db):
ch_fallback_days = int(os.environ.get('CLICKHOUSE_FALLBACK_DAYS', constants.CLICKHOUSE_FALLBACK_DAYS))

if records == None or len(records) == 0:
return records

min_ts = get_minimum_ts(records, ts_key)
if is_ts_older(min_ts, ch_fallback_days):
records = asyncio.run(filter_records(records, min_ts, db))
return records

def is_ts_older(ts, days):
difference = datetime.utcnow() - datetime.utcfromtimestamp(ts)
return difference.days > days

async def filter_records(items, min_ts_epoch, db):
if items == None or len(items) == 0:
return items

message_type = items[0].get('type')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All items will be of same type in the list?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes as we are running this deduplicate function for each type of message array

skip_message_types = os.environ.get('CLICKHOUSE_SKIP_FOR_MESSAGE_TYPES')

if skip_message_types != None and (message_type in skip_message_types.split(',')):
logging.info(f'Ignoring check for deduplication for type {message_type} as it is ignored')
return items

table_name = get_table_name(message_type)
ts_column_name = get_table_ts_column_name(message_type)
if table_name == None:
logging.warn(f'Ignoring check for deduplication for type {message_type} as table not found')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be panic instead of warning. service should stop execution.

return items

min_ts = datetime.utcfromtimestamp(min_ts_epoch).strftime('%Y-%m-%d')

# extract all ids
ids = list([obj["id"] for obj in items])
ids_from_db = []

parameters = { 'table': table_name, 'ids': [], 'timestamp_key': ts_column_name, 'block_timestamp': min_ts }
query = '''SELECT id FROM {table:Identifier} WHERE id IN {ids:Array(String)} and {timestamp_key:Identifier} >= {block_timestamp:String}'''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why >= & not = only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is there any need of batching?
Also

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why >= & not = only? - we are checking against minimum-timestamp obtained from RPC records. It is possible that some records in batch might be of higher than minimum-timestamp so to check them in clickhouse we are using >=. If required, to reduce number of partitions we can compute maximum-timestamp and use it with <=

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is there any need of batching?

yes, I have observed that in one SQL query if I add more than 1500 elements in IN condition then query fails


chunk_size = int(os.environ.get('CLICKHOUSE_QUERY_CHUNK_SIZE', constants.CLICKHOUSE_QUERY_CHUNK_SIZE))
for i in range(0, len(ids), chunk_size):
chunk = ids[i:i + chunk_size]
parameters['ids'] = chunk

db_results = await db.run_query(query, parameters)
ids_from_db = ids_from_db + [t[0] for t in db_results]

return [item for item in items if item['id'] not in ids_from_db]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this efficient in python to do array search. IMO better approach is to implement map to check.


def get_table_name(message_type):
return constants.ENTITY_TO_TABLE_MAP.get(message_type)

def get_table_ts_column_name(message_type):
return constants.ENTITY_TO_TABLE_TS_COLUMNS_MAP.get(message_type)

def get_minimum_ts(items, key):
# get timestamp of oldest message from items list
record = min(items, key=lambda x: x.get(key, float('inf')))
return record.get(key)
27 changes: 27 additions & 0 deletions ethereumetl/deduplication/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import hashlib
import redis
from ethereumetl.constants import constants

class RedisConnector:
def __init__(self):
self.ttl = os.environ['REDIS_MESSAGE_TTL']

redis_host = os.environ['REDIS_HOST']
redis_port = os.environ['REDIS_PORT']
redis_database = os.environ['REDIS_DB']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use chainId for REDIS_DB ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure but lets do it for next chains as for current DEGEN we have already provided its value so if we change in between then it might not be able to do dedup based on keys stored in different database

Also, this is configurable from envs so should be easily doable


self.redis_client = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_database)

def exists_in_set(self, key, value):
return self.redis_client.exists(self.create_key(key, value))

def add_to_set(self, key, value):
return self.redis_client.setex(self.create_key(key, value), self.ttl, '1')

def create_key(self, key, value):
hashed_data = hashlib.sha1(f"{key}_{value}".encode()).hexdigest()
return f"{constants.REDIS_PREFIX}_{hashed_data}"

def close(self):
self.redis_client.close()
2 changes: 2 additions & 0 deletions ethereumetl/domain/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

class EthTrace(object):
def __init__(self):
self.before_evm_transfers = None
self.after_evm_transfers = None
self.block_number = None
self.transaction_hash = None
self.transaction_index = None
Expand Down
1 change: 1 addition & 0 deletions ethereumetl/jobs/export_geth_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def _export_batch(self, block_number_batch):

# add tx_hash to tx_trace
for obj in result:
# TODO: fix this error
obj['result']['tx_hash'] = obj.get('txHash')
trace_error = obj.get('result').get('error')
if trace_error is not None:
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/extract_contracts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _extract_contracts(self, traces):
trace['block_number'] = to_int_or_none(trace.get('block_number'))

contract_creation_traces = [trace for trace in traces
if trace.get('trace_type') == 'create' and trace.get('to_address') is not None
if (trace.get('trace_type') == 'create' or trace.get('trace_type') == 'create2') and trace.get('to_address') is not None
and len(trace.get('to_address')) > 0 and trace.get('status') == 1]

contracts = []
Expand Down
Loading
Loading