Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEGEN ETL Improvements [V1] #10

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
18a0634
support for start-block and file arg
manjeet9727 Apr 7, 2024
854cd2f
check messages in set before publishing
manjeet9727 Apr 7, 2024
6bacb79
support for bloom-filters
manjeet9727 Apr 7, 2024
e1d4a48
support for prometheus-metrics
manjeet9727 Apr 7, 2024
3a28809
change sorting order & minor redis changes
manjeet9727 Apr 8, 2024
42d8c6d
Merge branch 'main' of github.com:Airstack-xyz/ethereum-etl into feat…
manjeet9727 Apr 8, 2024
fc4c540
refactor code
manjeet9727 Apr 8, 2024
2f0298d
refactor code
manjeet9727 Apr 8, 2024
00d0f82
refactor code
manjeet9727 Apr 8, 2024
acf6122
Update setup.py
manjeet9727 Apr 8, 2024
5db3cd3
fix: lz4
manjeet9727 Apr 8, 2024
77871a3
Merge pull request #14 from Airstack-xyz/main
manjeet9727 Apr 8, 2024
2adc05b
Support for both start-block vs last-synced-file
manjeet9727 Apr 8, 2024
eb42f62
Support for both start-block vs last-synced-file
manjeet9727 Apr 9, 2024
acb31a8
Merge branch 'main' into feat/degen-etl-improvements
manjeet9727 Apr 9, 2024
be93dc5
Merge branch 'feat/degen-start-block' into feat/degen-etl-improvements
manjeet9727 Apr 9, 2024
9ce6504
WIP: CH support
manjeet9727 Apr 9, 2024
14bfff6
FIX: balance traces
manjeet9727 Apr 9, 2024
2b17ec7
FIX: balance traces
manjeet9727 Apr 9, 2024
42a87a9
FIX: balance traces
manjeet9727 Apr 9, 2024
c330081
Merge branch 'fix/traces' into feat/degen-etl-improvements
manjeet9727 Apr 9, 2024
ccdc5d2
process blocks at the end
manjeet9727 Apr 9, 2024
022e5c0
WIP: CH support
manjeet9727 Apr 9, 2024
52dfa13
WIP: CH support
manjeet9727 Apr 9, 2024
f622926
support for redis lock
manjeet9727 Apr 9, 2024
1d52c8e
support for redis lock
manjeet9727 Apr 10, 2024
a769547
WIP dedup logic
manjeet9727 Apr 10, 2024
5ee0a0b
Added support for dedup using CH
manjeet9727 Apr 10, 2024
c52eb78
Added support for dedup using CH
manjeet9727 Apr 10, 2024
d03e88c
Added support for dedup using CH
manjeet9727 Apr 10, 2024
975dea2
Added support for dedup using CH
manjeet9727 Apr 10, 2024
9e76bef
Added support for dedup using CH
manjeet9727 Apr 10, 2024
38fb76a
Added support for dedup using CH
manjeet9727 Apr 10, 2024
a2fffaf
Added support for dedup using CH
manjeet9727 Apr 10, 2024
4c99c0e
Added support for dedup using CH
manjeet9727 Apr 10, 2024
a82e57b
Added backfill jobs
manjeet9727 Apr 11, 2024
b71fb0f
Added backfill jobs
manjeet9727 Apr 11, 2024
89ee8a7
Merge pull request #15 from Airstack-xyz/feat/degen-start-block
0xsarvesh Apr 14, 2024
02e5b87
Added more logs
manjeet9727 Apr 14, 2024
b59df5f
make last-synced-block-file as required param
manjeet9727 Apr 15, 2024
d181885
fix: str traces value
manjeet9727 Apr 15, 2024
35fed03
Merge branch 'develop' into feat/degen-etl-improvements
manjeet9727 Apr 15, 2024
ebb3dd8
added details for EVM Transfers
manjeet9727 Apr 16, 2024
6d6b8df
code review fixes
manjeet9727 Apr 16, 2024
f90e705
code review fixes
manjeet9727 Apr 16, 2024
6e28885
revert: str traces value
manjeet9727 Apr 17, 2024
4f451f4
WIP: backmerge till ca54ef6
manjeet9727 Apr 23, 2024
e438604
fix backmerge
manjeet9727 Apr 24, 2024
d1c63f1
fix backmerge
manjeet9727 Apr 24, 2024
f62ca04
Merge branch 'fix/traces' into feat/degen-etl-improvements
manjeet9727 Apr 24, 2024
06cef7e
Merge branch 'feat/degen-etl-improvements' into feat/degen-v2-backmerge
manjeet9727 Apr 24, 2024
39c407f
Merge branch 'feat/degen-etl-improvements' into feat/degen-etl-improv…
manjeet9727 Apr 24, 2024
b157061
Merge branch 'feat/degen-v2-backmerge' into feat/etl-improvements-com…
manjeet9727 Apr 24, 2024
bf5899e
fix: add util to convert numric to string
manjeet9727 Apr 24, 2024
dbf7f2d
feat: produce to kafka and then mark in redis
manjeet9727 Apr 25, 2024
c60b1d4
update table mapping for token stream
manjeet9727 Apr 25, 2024
2f7b41c
Revert "WIP: backmerge till ca54ef6"
manjeet9727 Apr 25, 2024
ebf5079
fix: do not handle produce error
manjeet9727 Apr 28, 2024
94e2eb6
test: dont log produce message
manjeet9727 Apr 29, 2024
68fb8a3
wip: test futures handling
manjeet9727 Apr 30, 2024
ce4812a
feat: raise exception
manjeet9727 May 14, 2024
563197b
add AttributeError except for future
14MR May 20, 2024
5ebe998
filter out None values in futures
14MR May 20, 2024
0d1be65
move marking redis after kafka message was sent
14MR May 23, 2024
d4cea15
set acks='all', linger_ms=5
14MR May 28, 2024
a28ff29
Merge pull request #20 from Airstack-xyz/feat/etl-improvements-v2
14MR Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.9.6
86 changes: 73 additions & 13 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,117 @@
from kafka import KafkaProducer

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter

from ethereumetl.deduplication.redis import RedisConnector
from ethereumetl.utils import convert_numeric_to_string


class KafkaItemExporter:

def __init__(self, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.enable_deduplication = (os.environ.get('ENABLE_DEDUPLICATION') != None)

self.connection_url = self.get_connection_url()
print(self.connection_url)
self.producer = KafkaProducer(
bootstrap_servers=self.connection_url,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=os.getenv('KAFKA_SCRAM_USERID'),
sasl_plain_password=os.getenv('KAFKA_SCRAM_PASSWORD'),
client_id=socket.gethostname(),
compression_type='lz4',
request_timeout_ms= 60000,
max_block_ms= 120000,
buffer_memory= 100000000)
compression_type=os.environ.get('KAFKA_COMPRESSION', 'lz4'),
request_timeout_ms=60000,
max_block_ms=120000,
buffer_memory=100000000,
retries=5,
batch_size=32768,
linger_ms=5,
acks='all'
)

# use redis for deduplication of live messages
self.redis = None
if self.enable_deduplication:
self.redis = RedisConnector()

def get_connection_url(self):
kafka_broker_uri = os.environ['KAFKA_BROKER_URI']
if kafka_broker_uri is None:
raise Exception('KAFKA_BROKER_URI is not set')
return kafka_broker_uri.split(',')

def open(self):
pass

def export_items(self, items):
futures = []
for item in items:
self.export_item(item)
futures.append(self.export_item(item))

futures = [f for f in futures if f is not None] # filter out None values

# wait for all messages to be sent
for item_topic, item_id, future in futures:
try:
res = future.get(timeout=10)
if res:
self.mark_processed(item_topic, item_id)
else:
logging.error('Empty response received')
except Exception as e:
logging.error(f'Failed to send message: {e}')
raise e

def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
# logging.debug(data)
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
else:
item_id = item.get('id')

if (item_id is None) or (item_type is None) or (item_type not in self.item_type_to_topic_mapping):
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
return

item_topic = self.item_type_to_topic_mapping[item_type]
data = self.parse_data(item)

if self.enable_deduplication:
if not self.already_processed(item_topic, item_id):
logging.info(f'Processing message of Type=[{item_type}]; Id=[{item_id}]')
return item_topic, item_id, self.produce_message(item_topic, data)
logging.info(f'Message was already processed skipping... Type=[{item_type}]; Id=[{item_id}]')
else:
return item_topic, item_id, self.produce_message(item_topic, data)

def convert_items(self, items):
for item in items:
yield self.converter.convert_item(item)

def close(self):
if self.redis != None:
self.redis.close()
pass

# utility function to set message as processed in Redis
def mark_processed(self, item_type, item_id):
if self.redis != None:
return self.redis.add_to_set(item_type, item_id)
return False

# utility functions to check message was already processed or not
def already_processed(self, item_type, item_id):
if self.redis != None:
return self.redis.exists_in_set(item_type, item_id)
return False

# utility functions to produce message to kafka
def produce_message(self, item_type, data):
return self.producer.send(item_type, value=data)

# utility functions to convert numeric data to string format
def parse_data(self, item):
data = convert_numeric_to_string(item)
return json.dumps(data).encode('utf-8')


def group_by_item_type(items):
result = collections.defaultdict(list)
Expand Down
16 changes: 5 additions & 11 deletions blockchainetl/streaming/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ def __init__(
self.prometheus_client = PrometheusConnector()

if self.mode == constants.RUN_MODE_NORMAL:
if self.start_block is not None or not os.path.isfile(self.last_synced_block_file):
init_last_synced_block_file((self.start_block or 0) - 1, self.last_synced_block_file)
# if "start-block" is provided and "last_synced_block_file" is not present
# then write "start-block - 1" to "last_synced_block_file"
if (self.start_block is not None) and (not os.path.isfile(self.last_synced_block_file)):
write_last_synced_block(self.last_synced_block_file, self.start_block - 1)

self.last_synced_block = read_last_synced_block(self.last_synced_block_file)

Expand Down Expand Up @@ -102,6 +104,7 @@ def _do_stream_correction(self):
time.sleep(self.period_seconds)

def _do_stream(self):
logging.info('End Block={} LastSyncedBlock={}'.format(self.end_block, self.last_synced_block))
while (self.end_block is None or self.last_synced_block < self.end_block):
synced_blocks = 0
try:
Expand Down Expand Up @@ -162,15 +165,6 @@ def write_last_synced_block(file, last_synced_block):
write_to_file(file, str(last_synced_block) + '\n')


def init_last_synced_block_file(start_block, last_synced_block_file):
if os.path.isfile(last_synced_block_file):
raise ValueError(
'{} should not exist if --start-block option is specified. '
'Either remove the {} file or the --start-block option.'
.format(last_synced_block_file, last_synced_block_file))
write_last_synced_block(last_synced_block_file, start_block)


def read_last_synced_block(file):
with smart_open(file, 'r') as last_synced_block_file:
return int(last_synced_block_file.read())
Expand Down
23 changes: 12 additions & 11 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.constants import constants

from prometheus_client import start_http_server

@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', show_default=True, type=str, help='')
@click.option('-l', '--last-synced-block-file', required=True, type=str, help='Stores last-synced-block-number and is used to continue sync in-case of restarts. If both "--last-synced-block-file" and "--start-block" are provided then block-number in "--last-synced-block-file" takes precedence')
@click.option('--lag', default=0, show_default=True, type=int, help='The number of blocks to lag behind the network.')
@click.option('-o', '--output', type=str,
help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; '
Expand All @@ -42,7 +44,7 @@
'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 '
'or Kinesis, e.g. kinesis://your-data-stream-name'
'If not specified will print to console')
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
@click.option('-s', '--start-block', required=True, type=int, help='Start block')
@click.option('-E', '--end-block', default=None, show_default=True, type=int, help='End block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
help='The list of entity types to export.')
Expand All @@ -63,16 +65,9 @@ def stream(last_synced_block_file, lag, output, start_block, end_block, entity_t

from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
from blockchainetl.streaming.streamer import Streamer

if os.environ['BLOCKCHAIN'] == None:
raise ValueError('BLOCKCHAIN env is missing')

check_required_envs()
provider_uri = os.environ['PROVIDER_URI']
if provider_uri == None:
raise ValueError('PROVIDER_URI env is missing')

if os.environ['KAFKA_BROKER_URI'] == None:
raise ValueError('KAFKA_BROKER_URI env is missing')

if mode == constants.RUN_MODE_CORRECTION:
blocks_to_reprocess = [int(block) for block in blocks_to_reprocess.split(',')]
Expand Down Expand Up @@ -105,9 +100,15 @@ def stream(last_synced_block_file, lag, output, start_block, end_block, entity_t
streamer.stream()


def check_required_envs():
for env in constants.REQUIRED_ENVS:
if os.environ[env] == None:
raise ValueError(f'{env} env is missing')


def parse_entity_types(entity_types):
entity_types = [c.strip() for c in entity_types.split(',')]

# validate passed types
for entity_type in entity_types:
if entity_type not in EntityType.ALL_FOR_STREAMING:
Expand Down
47 changes: 46 additions & 1 deletion ethereumetl/constants/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ethereumetl.enumeration.entity_type import EntityType

TOKEN_TYPE_ERC20 = 'ERC20'
TOKEN_TYPE_ERC721 = 'ERC721'
Expand Down Expand Up @@ -42,4 +43,48 @@
RUN_MODE_CORRECTION = 'correction'
RUN_MODE_NORMAL= 'normal'

METRICS_PORT = '9000'
# variables for deduplication
REDIS_PREFIX = 'etl'
METRICS_PORT = '9000'
CLICKHOUSE_FALLBACK_DAYS = '1'
CLICKHOUSE_QUERY_CHUNK_SIZE = 1500

REQUIRED_ENVS = [
# envs for kafka integration
'BLOCKCHAIN',
'PROVIDER_URI',
'KAFKA_BROKER_URI',

# envs for deduplication support
'REDIS_HOST',
'REDIS_PORT',
'REDIS_DB',
'REDIS_MESSAGE_TTL',
'CLICKHOUSE_HOST',
'CLICKHOUSE_PORT',
'CLICKHOUSE_USERNAME',
'CLICKHOUSE_PASSWORD',
'CLICKHOUSE_DATABASE'
]

ENTITY_TO_TABLE_MAP = {
EntityType.BLOCK: 'blocks',
EntityType.TRANSACTION: 'transactions',
EntityType.LOG: 'logs',
EntityType.TOKEN_TRANSFER: 'token_transfers',
EntityType.TRACE: 'traces',
EntityType.GETH_TRACE: 'traces',
EntityType.CONTRACT: 'contracts',
EntityType.TOKEN: 'enriched_contracts',
}

ENTITY_TO_TABLE_TS_COLUMNS_MAP = {
EntityType.BLOCK: 'timestamp',
EntityType.TOKEN: 'block_timestamp',
EntityType.TRANSACTION: 'block_timestamp',
EntityType.LOG: 'block_timestamp',
EntityType.TOKEN_TRANSFER: 'block_timestamp',
EntityType.TRACE: 'block_timestamp',
EntityType.GETH_TRACE: 'block_timestamp',
EntityType.CONTRACT: 'block_timestamp',
}
46 changes: 46 additions & 0 deletions ethereumetl/deduplication/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import clickhouse_connect
import logging

class Clickhouse:
"""
Clickhouse Connector
"""

def __init__(self):
"""
Connect to database and provide it's client
:param host:
:param port:
:param username:
:param password:
:param database:
"""
logging.debug('Connecting to clickhouse !!')

self._host = os.environ['CLICKHOUSE_HOST']
self._port = os.environ['CLICKHOUSE_PORT']
self._username = os.environ['CLICKHOUSE_USERNAME']
self._password = os.environ['CLICKHOUSE_PASSWORD']
self._database = os.environ['CLICKHOUSE_DATABASE']

logging.debug(
f'Making Connection to DB with host: {self._host}, port: {self._port}, database: {self._database}'
)
self._connection = clickhouse_connect.get_client(host=self._host, port=self._port, secure=True,
username=self._username, password=self._password, database=self._database)


async def run_query(self, query, parameters):
"""Function to run query on clickhouse

Args:
query (str): query to run
parameters (dict): variable parameters

Returns:
list: fetched data
"""
logging.debug(f'Running SQL Query {query}')
result = self._connection.query(query=query, parameters=parameters)
return result.result_rows
Loading
Loading