diff --git a/.python-version b/.python-version new file mode 100644 index 000000000..1635d0f5a --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.9.6 diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index 757424691..3fcb595a0 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -7,7 +7,8 @@ from kafka import KafkaProducer from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter - +from ethereumetl.deduplication.redis import RedisConnector +from ethereumetl.utils import convert_numeric_to_string class KafkaItemExporter: @@ -15,8 +16,9 @@ class KafkaItemExporter: def __init__(self, item_type_to_topic_mapping, converters=()): self.item_type_to_topic_mapping = item_type_to_topic_mapping self.converter = CompositeItemConverter(converters) + self.enable_deduplication = (os.environ.get('ENABLE_DEDUPLICATION') != None) + self.connection_url = self.get_connection_url() - print(self.connection_url) self.producer = KafkaProducer( bootstrap_servers=self.connection_url, security_protocol='SASL_SSL', @@ -24,40 +26,98 @@ def __init__(self, item_type_to_topic_mapping, converters=()): sasl_plain_username=os.getenv('KAFKA_SCRAM_USERID'), sasl_plain_password=os.getenv('KAFKA_SCRAM_PASSWORD'), client_id=socket.gethostname(), - compression_type='lz4', - request_timeout_ms= 60000, - max_block_ms= 120000, - buffer_memory= 100000000) + compression_type=os.environ.get('KAFKA_COMPRESSION', 'lz4'), + request_timeout_ms=60000, + max_block_ms=120000, + buffer_memory=100000000, + retries=5, + batch_size=32768, + linger_ms=5, + acks='all' + ) + + # use redis for deduplication of live messages + self.redis = None + if self.enable_deduplication: + self.redis = RedisConnector() def get_connection_url(self): kafka_broker_uri = os.environ['KAFKA_BROKER_URI'] if kafka_broker_uri is None: raise Exception('KAFKA_BROKER_URI is not set') return kafka_broker_uri.split(',') - + def open(self): pass def export_items(self, items): + futures = [] for item in items: - self.export_item(item) + futures.append(self.export_item(item)) + + futures = [f for f in futures if f is not None] # filter out None values + + # wait for all messages to be sent + for item_topic, item_id, future in futures: + try: + res = future.get(timeout=10) + if res: + self.mark_processed(item_topic, item_id) + else: + logging.error('Empty response received') + except Exception as e: + logging.error(f'Failed to send message: {e}') + raise e def export_item(self, item): item_type = item.get('type') - if item_type is not None and item_type in self.item_type_to_topic_mapping: - data = json.dumps(item).encode('utf-8') - # logging.debug(data) - return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data) - else: + item_id = item.get('id') + + if (item_id is None) or (item_type is None) or (item_type not in self.item_type_to_topic_mapping): logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) + return + + item_topic = self.item_type_to_topic_mapping[item_type] + data = self.parse_data(item) + + if self.enable_deduplication: + if not self.already_processed(item_topic, item_id): + logging.info(f'Processing message of Type=[{item_type}]; Id=[{item_id}]') + return item_topic, item_id, self.produce_message(item_topic, data) + logging.info(f'Message was already processed skipping... Type=[{item_type}]; Id=[{item_id}]') + else: + return item_topic, item_id, self.produce_message(item_topic, data) def convert_items(self, items): for item in items: yield self.converter.convert_item(item) def close(self): + if self.redis != None: + self.redis.close() pass + # utility function to set message as processed in Redis + def mark_processed(self, item_type, item_id): + if self.redis != None: + return self.redis.add_to_set(item_type, item_id) + return False + + # utility functions to check message was already processed or not + def already_processed(self, item_type, item_id): + if self.redis != None: + return self.redis.exists_in_set(item_type, item_id) + return False + + # utility functions to produce message to kafka + def produce_message(self, item_type, data): + return self.producer.send(item_type, value=data) + + # utility functions to convert numeric data to string format + def parse_data(self, item): + data = convert_numeric_to_string(item) + return json.dumps(data).encode('utf-8') + def group_by_item_type(items): result = collections.defaultdict(list) diff --git a/blockchainetl/streaming/streamer.py b/blockchainetl/streaming/streamer.py index 043f93b25..343ff598a 100644 --- a/blockchainetl/streaming/streamer.py +++ b/blockchainetl/streaming/streamer.py @@ -63,8 +63,10 @@ def __init__( self.prometheus_client = PrometheusConnector() if self.mode == constants.RUN_MODE_NORMAL: - if self.start_block is not None or not os.path.isfile(self.last_synced_block_file): - init_last_synced_block_file((self.start_block or 0) - 1, self.last_synced_block_file) + # if "start-block" is provided and "last_synced_block_file" is not present + # then write "start-block - 1" to "last_synced_block_file" + if (self.start_block is not None) and (not os.path.isfile(self.last_synced_block_file)): + write_last_synced_block(self.last_synced_block_file, self.start_block - 1) self.last_synced_block = read_last_synced_block(self.last_synced_block_file) @@ -102,6 +104,7 @@ def _do_stream_correction(self): time.sleep(self.period_seconds) def _do_stream(self): + logging.info('End Block={} LastSyncedBlock={}'.format(self.end_block, self.last_synced_block)) while (self.end_block is None or self.last_synced_block < self.end_block): synced_blocks = 0 try: @@ -162,15 +165,6 @@ def write_last_synced_block(file, last_synced_block): write_to_file(file, str(last_synced_block) + '\n') -def init_last_synced_block_file(start_block, last_synced_block_file): - if os.path.isfile(last_synced_block_file): - raise ValueError( - '{} should not exist if --start-block option is specified. ' - 'Either remove the {} file or the --start-block option.' - .format(last_synced_block_file, last_synced_block_file)) - write_last_synced_block(last_synced_block_file, start_block) - - def read_last_synced_block(file): with smart_open(file, 'r') as last_synced_block_file: return int(last_synced_block_file.read()) diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 98d82f56d..9c2f7d0d3 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -32,8 +32,10 @@ from ethereumetl.thread_local_proxy import ThreadLocalProxy from ethereumetl.constants import constants +from prometheus_client import start_http_server + @click.command(context_settings=dict(help_option_names=['-h', '--help'])) -@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', show_default=True, type=str, help='') +@click.option('-l', '--last-synced-block-file', required=True, type=str, help='Stores last-synced-block-number and is used to continue sync in-case of restarts. If both "--last-synced-block-file" and "--start-block" are provided then block-number in "--last-synced-block-file" takes precedence') @click.option('--lag', default=0, show_default=True, type=int, help='The number of blocks to lag behind the network.') @click.option('-o', '--output', type=str, help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; ' @@ -42,7 +44,7 @@ 'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 ' 'or Kinesis, e.g. kinesis://your-data-stream-name' 'If not specified will print to console') -@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block') +@click.option('-s', '--start-block', required=True, type=int, help='Start block') @click.option('-E', '--end-block', default=None, show_default=True, type=int, help='End block') @click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str, help='The list of entity types to export.') @@ -63,16 +65,9 @@ def stream(last_synced_block_file, lag, output, start_block, end_block, entity_t from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter from blockchainetl.streaming.streamer import Streamer - - if os.environ['BLOCKCHAIN'] == None: - raise ValueError('BLOCKCHAIN env is missing') + check_required_envs() provider_uri = os.environ['PROVIDER_URI'] - if provider_uri == None: - raise ValueError('PROVIDER_URI env is missing') - - if os.environ['KAFKA_BROKER_URI'] == None: - raise ValueError('KAFKA_BROKER_URI env is missing') if mode == constants.RUN_MODE_CORRECTION: blocks_to_reprocess = [int(block) for block in blocks_to_reprocess.split(',')] @@ -105,9 +100,15 @@ def stream(last_synced_block_file, lag, output, start_block, end_block, entity_t streamer.stream() +def check_required_envs(): + for env in constants.REQUIRED_ENVS: + if os.environ[env] == None: + raise ValueError(f'{env} env is missing') + + def parse_entity_types(entity_types): entity_types = [c.strip() for c in entity_types.split(',')] - + # validate passed types for entity_type in entity_types: if entity_type not in EntityType.ALL_FOR_STREAMING: diff --git a/ethereumetl/constants/constants.py b/ethereumetl/constants/constants.py index f645fd966..a45bd5f02 100644 --- a/ethereumetl/constants/constants.py +++ b/ethereumetl/constants/constants.py @@ -1,3 +1,4 @@ +from ethereumetl.enumeration.entity_type import EntityType TOKEN_TYPE_ERC20 = 'ERC20' TOKEN_TYPE_ERC721 = 'ERC721' @@ -42,4 +43,48 @@ RUN_MODE_CORRECTION = 'correction' RUN_MODE_NORMAL= 'normal' -METRICS_PORT = '9000' \ No newline at end of file +# variables for deduplication +REDIS_PREFIX = 'etl' +METRICS_PORT = '9000' +CLICKHOUSE_FALLBACK_DAYS = '1' +CLICKHOUSE_QUERY_CHUNK_SIZE = 1500 + +REQUIRED_ENVS = [ + # envs for kafka integration + 'BLOCKCHAIN', + 'PROVIDER_URI', + 'KAFKA_BROKER_URI', + + # envs for deduplication support + 'REDIS_HOST', + 'REDIS_PORT', + 'REDIS_DB', + 'REDIS_MESSAGE_TTL', + 'CLICKHOUSE_HOST', + 'CLICKHOUSE_PORT', + 'CLICKHOUSE_USERNAME', + 'CLICKHOUSE_PASSWORD', + 'CLICKHOUSE_DATABASE' +] + +ENTITY_TO_TABLE_MAP = { + EntityType.BLOCK: 'blocks', + EntityType.TRANSACTION: 'transactions', + EntityType.LOG: 'logs', + EntityType.TOKEN_TRANSFER: 'token_transfers', + EntityType.TRACE: 'traces', + EntityType.GETH_TRACE: 'traces', + EntityType.CONTRACT: 'contracts', + EntityType.TOKEN: 'enriched_contracts', +} + +ENTITY_TO_TABLE_TS_COLUMNS_MAP = { + EntityType.BLOCK: 'timestamp', + EntityType.TOKEN: 'block_timestamp', + EntityType.TRANSACTION: 'block_timestamp', + EntityType.LOG: 'block_timestamp', + EntityType.TOKEN_TRANSFER: 'block_timestamp', + EntityType.TRACE: 'block_timestamp', + EntityType.GETH_TRACE: 'block_timestamp', + EntityType.CONTRACT: 'block_timestamp', +} diff --git a/ethereumetl/deduplication/clickhouse.py b/ethereumetl/deduplication/clickhouse.py new file mode 100644 index 000000000..675bbdb42 --- /dev/null +++ b/ethereumetl/deduplication/clickhouse.py @@ -0,0 +1,46 @@ +import os +import clickhouse_connect +import logging + +class Clickhouse: + """ + Clickhouse Connector + """ + + def __init__(self): + """ + Connect to database and provide it's client + :param host: + :param port: + :param username: + :param password: + :param database: + """ + logging.debug('Connecting to clickhouse !!') + + self._host = os.environ['CLICKHOUSE_HOST'] + self._port = os.environ['CLICKHOUSE_PORT'] + self._username = os.environ['CLICKHOUSE_USERNAME'] + self._password = os.environ['CLICKHOUSE_PASSWORD'] + self._database = os.environ['CLICKHOUSE_DATABASE'] + + logging.debug( + f'Making Connection to DB with host: {self._host}, port: {self._port}, database: {self._database}' + ) + self._connection = clickhouse_connect.get_client(host=self._host, port=self._port, secure=True, + username=self._username, password=self._password, database=self._database) + + + async def run_query(self, query, parameters): + """Function to run query on clickhouse + + Args: + query (str): query to run + parameters (dict): variable parameters + + Returns: + list: fetched data + """ + logging.debug(f'Running SQL Query {query}') + result = self._connection.query(query=query, parameters=parameters) + return result.result_rows diff --git a/ethereumetl/deduplication/deduplicate.py b/ethereumetl/deduplication/deduplicate.py new file mode 100644 index 000000000..6d55e89bb --- /dev/null +++ b/ethereumetl/deduplication/deduplicate.py @@ -0,0 +1,72 @@ +import os +import logging +import asyncio +from datetime import datetime +from ethereumetl.constants import constants + +def deduplicate_records(records, ts_key, db): + logging.info('Request to check for deduplication') + ch_fallback_days = int(os.environ.get('CLICKHOUSE_FALLBACK_DAYS', constants.CLICKHOUSE_FALLBACK_DAYS)) + + if records == None or len(records) == 0: + return records + + min_ts = get_minimum_ts(records, ts_key) + logging.info(f'Minimum ts to records to check for deduplication - {min_ts}') + + if is_ts_older(min_ts, ch_fallback_days): + logging.info('Fetching details from Clickhouse for records') + records = asyncio.run(filter_records(records, min_ts, db)) + logging.info('Fetched details from Clickhouse for records') + return records + +def is_ts_older(ts, days): + difference = datetime.utcnow() - datetime.utcfromtimestamp(ts) + return difference.days > days + +async def filter_records(items, min_ts_epoch, db): + if items == None or len(items) == 0: + return items + + message_type = items[0].get('type') + skip_message_types = os.environ.get('CLICKHOUSE_SKIP_FOR_MESSAGE_TYPES') + + if skip_message_types != None and (message_type in skip_message_types.split(',')): + logging.info(f'Ignoring check for deduplication for type {message_type} as it is ignored') + return items + + table_name = get_table_name(message_type) + if table_name == None: + logging.error(f'Ignoring check for deduplication for type {message_type} as table not found') + os._exit(1) + + ts_column_name = get_table_ts_column_name(message_type) + min_ts = datetime.utcfromtimestamp(min_ts_epoch).strftime('%Y-%m-%d') + + # extract all ids + ids = list([obj["id"] for obj in items]) + ids_from_db = {} + + parameters = { 'table': table_name, 'ids': [], 'timestamp_key': ts_column_name, 'block_timestamp': min_ts } + query = '''SELECT id FROM {table:Identifier} WHERE id IN {ids:Array(String)} and {timestamp_key:Identifier} >= {block_timestamp:String}''' + + chunk_size = int(os.environ.get('CLICKHOUSE_QUERY_CHUNK_SIZE', constants.CLICKHOUSE_QUERY_CHUNK_SIZE)) + for i in range(0, len(ids), chunk_size): + chunk = ids[i:i + chunk_size] + parameters['ids'] = chunk + + db_results = await db.run_query(query, parameters) + ids_from_db.update({t[0]: True for t in db_results}) + + return [item for item in items if item['id'] not in ids_from_db] + +def get_table_name(message_type): + return constants.ENTITY_TO_TABLE_MAP.get(message_type) + +def get_table_ts_column_name(message_type): + return constants.ENTITY_TO_TABLE_TS_COLUMNS_MAP.get(message_type) + +def get_minimum_ts(items, key): + # get timestamp of oldest message from items list + record = min(items, key=lambda x: x.get(key, float('inf'))) + return record.get(key) diff --git a/ethereumetl/deduplication/redis.py b/ethereumetl/deduplication/redis.py new file mode 100644 index 000000000..534c8f020 --- /dev/null +++ b/ethereumetl/deduplication/redis.py @@ -0,0 +1,27 @@ +import os +import hashlib +import redis +from ethereumetl.constants import constants + +class RedisConnector: + def __init__(self): + self.ttl = os.environ['REDIS_MESSAGE_TTL'] + + redis_host = os.environ['REDIS_HOST'] + redis_port = os.environ['REDIS_PORT'] + redis_database = os.environ['REDIS_DB'] + + self.redis_client = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_database) + + def exists_in_set(self, key, value): + return self.redis_client.exists(self.create_key(key, value)) + + def add_to_set(self, key, value): + return self.redis_client.setex(self.create_key(key, value), self.ttl, '1') + + def create_key(self, key, value): + hashed_data = hashlib.sha1(f"{key}_{value}".encode()).hexdigest() + return f"{constants.REDIS_PREFIX}_{hashed_data}" + + def close(self): + self.redis_client.close() diff --git a/ethereumetl/domain/trace.py b/ethereumetl/domain/trace.py index c49264816..f0e7075ba 100644 --- a/ethereumetl/domain/trace.py +++ b/ethereumetl/domain/trace.py @@ -23,6 +23,12 @@ class EthTrace(object): def __init__(self): + # NOTE: "before_evm_transfers" and "before_evm_transfers" are only applicable to Arbitrum-based traces + # "before_evm_transfers" - An array representing EVM transfers that occurred before the execution of the transaction + # "after_evm_transfers" - An array representing EVM transfers that occurred after the execution of the transaction + # Reference: https://www.quicknode.com/docs/arbitrum/debug_traceBlockByHash + self.before_evm_transfers = None + self.after_evm_transfers = None self.block_number = None self.transaction_hash = None self.transaction_index = None diff --git a/ethereumetl/jobs/export_geth_traces_job.py b/ethereumetl/jobs/export_geth_traces_job.py index 20f24a8a2..34b8cc09b 100644 --- a/ethereumetl/jobs/export_geth_traces_job.py +++ b/ethereumetl/jobs/export_geth_traces_job.py @@ -73,6 +73,7 @@ def _export_batch(self, block_number_batch): # add tx_hash to tx_trace for obj in result: + # TODO: fix this error obj['result']['tx_hash'] = obj.get('txHash') trace_error = obj.get('result').get('error') if trace_error is not None: diff --git a/ethereumetl/jobs/extract_contracts_job.py b/ethereumetl/jobs/extract_contracts_job.py index 1c1b3e84a..5248567b0 100644 --- a/ethereumetl/jobs/extract_contracts_job.py +++ b/ethereumetl/jobs/extract_contracts_job.py @@ -59,7 +59,7 @@ def _extract_contracts(self, traces): trace['block_number'] = to_int_or_none(trace.get('block_number')) contract_creation_traces = [trace for trace in traces - if trace.get('trace_type') == 'create' and trace.get('to_address') is not None + if (trace.get('trace_type') == 'create' or trace.get('trace_type') == 'create2') and trace.get('to_address') is not None and len(trace.get('to_address')) > 0 and trace.get('status') == 1] contracts = [] diff --git a/ethereumetl/mappers/trace_mapper.py b/ethereumetl/mappers/trace_mapper.py index 5a6ab9dc6..0a00f1f33 100644 --- a/ethereumetl/mappers/trace_mapper.py +++ b/ethereumetl/mappers/trace_mapper.py @@ -30,6 +30,8 @@ class EthTraceMapper(object): def json_dict_to_trace(self, json_dict): trace = EthTrace() + trace.before_evm_transfers = json_dict.get('beforeEVMTransfers') + trace.after_evm_transfers = json_dict.get('afterEVMTransfers') trace.block_number = json_dict.get('blockNumber') trace.transaction_hash = json_dict.get('transactionHash') trace.transaction_index = json_dict.get('transactionPosition') @@ -52,7 +54,7 @@ def json_dict_to_trace(self, json_dict): trace.trace_type = trace_type # common fields in call/create - if trace_type in ('call', 'create'): + if trace_type in ('call', 'create', 'create2'): trace.from_address = to_normalized_address(action.get('from')) trace.value = hex_to_dec(action.get('value')) trace.gas = hex_to_dec(action.get('gas')) @@ -68,6 +70,10 @@ def json_dict_to_trace(self, json_dict): trace.to_address = result.get('address') trace.input = action.get('init') trace.output = result.get('code') + elif trace_type == 'create2': + trace.to_address = result.get('address') + trace.input = action.get('init') + trace.output = result.get('code') elif trace_type == 'suicide': trace.from_address = to_normalized_address(action.get('address')) trace.to_address = to_normalized_address(action.get('refundAddress')) @@ -126,6 +132,10 @@ def daofork_state_change_to_trace(self, state_change): def _iterate_transaction_trace(self, block_number, tx_index, tx_trace, trace_address=[]): trace = EthTrace() + + trace.before_evm_transfers = tx_trace.get('beforeEVMTransfers') + trace.after_evm_transfers = tx_trace.get('afterEVMTransfers') + trace.transaction_hash = tx_trace.get('tx_hash') trace.status = tx_trace.get('status') trace.block_number = block_number @@ -174,6 +184,8 @@ def _iterate_transaction_trace(self, block_number, tx_index, tx_trace, trace_add def trace_to_dict(self, trace, trace_type): return { 'type': trace_type, + 'before_evm_transfers': trace.before_evm_transfers, + 'after_evm_transfers': trace.after_evm_transfers, 'block_number': trace.block_number, 'transaction_hash': trace.transaction_hash, 'transaction_index': trace.transaction_index, diff --git a/ethereumetl/streaming/enrich.py b/ethereumetl/streaming/enrich.py index eb69404e3..f2c6a6278 100644 --- a/ethereumetl/streaming/enrich.py +++ b/ethereumetl/streaming/enrich.py @@ -199,6 +199,8 @@ def enrich_geth_traces(blocks, traces): [ 'type', 'transaction_index', + 'before_evm_transfers', + 'after_evm_transfers', 'from_address', 'to_address', 'value', diff --git a/ethereumetl/streaming/eth_streamer_adapter.py b/ethereumetl/streaming/eth_streamer_adapter.py index d8a065528..91f609659 100644 --- a/ethereumetl/streaming/eth_streamer_adapter.py +++ b/ethereumetl/streaming/eth_streamer_adapter.py @@ -1,7 +1,9 @@ import logging +import os from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter from blockchainetl.jobs.exporters.in_memory_item_exporter import InMemoryItemExporter +from ethereumetl.deduplication.clickhouse import Clickhouse from ethereumetl.enumeration.entity_type import EntityType from ethereumetl.jobs.export_blocks_job import ExportBlocksJob from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob @@ -18,6 +20,7 @@ from ethereumetl.thread_local_proxy import ThreadLocalProxy from ethereumetl.web3_utils import build_web3 from ethereumetl.providers.auto import get_provider_from_uri +from ethereumetl.deduplication.deduplicate import deduplicate_records class EthStreamerAdapter: @@ -33,6 +36,11 @@ def __init__( self.batch_size = batch_size self.max_workers = max_workers self.entity_types = entity_types + + self.clickhouse_db = None + if os.environ.get('ENABLE_DEDUPLICATION') != None: + self.clickhouse_db = Clickhouse() + self.item_id_calculator = EthItemIdCalculator() self.item_timestamp_calculator = EthItemTimestampCalculator() @@ -80,36 +88,46 @@ def export_all(self, start_block, end_block): if self._should_export(EntityType.TOKEN): tokens = self._extract_tokens(contracts) - enriched_blocks = blocks \ + enriched_blocks = self.calculate_item_ids(blocks) \ if EntityType.BLOCK in self.entity_types else [] - enriched_transactions = enrich_transactions(transactions, receipts) \ + enriched_transactions = self.calculate_item_ids(enrich_transactions(transactions, receipts)) \ if EntityType.TRANSACTION in self.entity_types else [] - enriched_logs = enrich_logs(blocks, logs) \ + enriched_logs = self.calculate_item_ids(enrich_logs(blocks, logs)) \ if EntityType.LOG in self.entity_types else [] - enriched_token_transfers = enrich_token_transfers(blocks, token_transfers) \ + enriched_token_transfers = self.calculate_item_ids(enrich_token_transfers(blocks, token_transfers)) \ if EntityType.TOKEN_TRANSFER in self.entity_types else [] - enriched_traces = enrich_traces(blocks, traces) \ + enriched_traces = self.calculate_item_ids(enrich_traces(blocks, traces)) \ if EntityType.TRACE in self.entity_types else [] - enriched_geth_traces = enrich_geth_traces(blocks, geth_traces) \ + enriched_geth_traces = self.calculate_item_ids(enrich_geth_traces(blocks, geth_traces)) \ if EntityType.GETH_TRACE in self.entity_types else [] - enriched_contracts = enrich_contracts(blocks, contracts) \ + enriched_contracts = self.calculate_item_ids(enrich_contracts(blocks, contracts)) \ if EntityType.CONTRACT in self.entity_types else [] - enriched_tokens = enrich_tokens(blocks, tokens) \ + enriched_tokens = self.calculate_item_ids(enrich_tokens(blocks, tokens)) \ if EntityType.TOKEN in self.entity_types else [] logging.info('Exporting with ' + type(self.item_exporter).__name__) + + if os.environ.get('ENABLE_DEDUPLICATION') != None and os.environ.get('OVERRIDE_CHECK_ALL_IN_CACHE') == None: + # TODO: improve this code + enriched_logs = deduplicate_records(enriched_logs, 'block_timestamp', self.clickhouse_db) + enriched_token_transfers = deduplicate_records(enriched_token_transfers, 'block_timestamp', self.clickhouse_db) + enriched_traces = deduplicate_records(enriched_traces, 'block_timestamp', self.clickhouse_db) + enriched_geth_traces = deduplicate_records(enriched_geth_traces, 'block_timestamp', self.clickhouse_db) + enriched_contracts = deduplicate_records(enriched_contracts, 'block_timestamp', self.clickhouse_db) + enriched_tokens = deduplicate_records(enriched_tokens, 'block_timestamp', self.clickhouse_db) + enriched_transactions = deduplicate_records(enriched_transactions, 'block_timestamp', self.clickhouse_db) + enriched_blocks = deduplicate_records(enriched_blocks, 'timestamp', self.clickhouse_db) all_items = \ - sort_by(enriched_blocks, 'number') + \ - sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \ sort_by(enriched_logs, ('block_number', 'log_index')) + \ sort_by(enriched_token_transfers, ('block_number', 'log_index')) + \ sort_by(enriched_traces, ('block_number', 'trace_index')) + \ sort_by(enriched_geth_traces, ('block_number', 'trace_index')) + \ sort_by(enriched_contracts, ('block_number',)) + \ - sort_by(enriched_tokens, ('block_number',)) + sort_by(enriched_tokens, ('block_number',)) + \ + sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \ + sort_by(enriched_blocks, 'number') - self.calculate_item_ids(all_items) #self.calculate_item_timestamps(all_items) self.item_exporter.export_items(all_items) @@ -253,6 +271,7 @@ def _should_export(self, entity_type): def calculate_item_ids(self, items): for item in items: item['id'] = self.item_id_calculator.calculate(item) + return items def calculate_item_timestamps(self, items): for item in items: diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index b712ed638..914d29daa 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -30,7 +30,7 @@ def create_item_exporters(outputs): split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] else: split_outputs = [outputs] - + item_exporters = [create_item_exporter(output) for output in split_outputs] return MultiItemExporter(item_exporters) @@ -89,7 +89,7 @@ def create_item_exporter(output): elif item_exporter_type == ItemExporterType.KAFKA: from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter blockchain = os.environ['BLOCKCHAIN'] - item_exporter = KafkaItemExporter( item_type_to_topic_mapping={ + item_exporter = KafkaItemExporter(item_type_to_topic_mapping={ 'block': blockchain + '_blocks', 'transaction': blockchain + '_transactions', 'log': blockchain + '_logs', diff --git a/ethereumetl/utils.py b/ethereumetl/utils.py index 51c547883..b7bc0f06e 100644 --- a/ethereumetl/utils.py +++ b/ethereumetl/utils.py @@ -27,6 +27,17 @@ from ethereumetl.misc.retriable_value_error import RetriableValueError from typing import List, Union +def convert_numeric_to_string(obj): + if isinstance(obj, dict): + for key, value in obj.items(): + obj[key] = convert_numeric_to_string(value) + elif isinstance(obj, list): + for i in range(len(obj)): + obj[i] = convert_numeric_to_string(obj[i]) + elif isinstance(obj, (int, float)) and not isinstance(obj, bool): + obj = str(obj) + return obj + def hex_to_dec(hex_string): if hex_string is None: return None diff --git a/setup.py b/setup.py index 7d51e3bd8..868251754 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,9 @@ def read(fname): 'libcst==0.3.21', # Later versions break the build in Travis CI for Python 3.7.2 'grpcio==1.46.3', - 'prometheus-client==0.20.0' + 'redis==3.5.3', + 'prometheus-client==0.20.0', + 'clickhouse-connect==0.6.14' ], 'streaming-kinesis': [ 'boto3==1.24.11',