From e052344a99de1b2a9bfbafb46911ed84bd9b8862 Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 15 Jul 2024 12:54:01 +0900 Subject: [PATCH] feat: remove onchain aggregator related codes --- cli/src/adapter.ts | 129 ------- cli/src/aggregator.ts | 319 ---------------- cli/src/datafeed.ts | 494 ------------------------ cli/src/fetcher.ts | 152 -------- cli/src/index.ts | 12 - cli/src/settings.ts | 16 +- cli/src/utils.ts | 22 -- cli/test/adapter.ts | 47 --- cli/test/aggregator.ts | 69 ---- core/package.json | 3 - core/src/listener/data-feed.ts | 103 ----- core/src/reporter/data-feed.ts | 32 -- core/src/worker/data-feed.ts | 584 ----------------------------- core/src/worker/data-feed.utils.ts | 134 ------- core/test/data-feed-worker.test.ts | 30 -- 15 files changed, 4 insertions(+), 2142 deletions(-) delete mode 100644 cli/src/adapter.ts delete mode 100644 cli/src/aggregator.ts delete mode 100644 cli/src/datafeed.ts delete mode 100644 cli/src/fetcher.ts delete mode 100644 cli/test/adapter.ts delete mode 100644 cli/test/aggregator.ts delete mode 100644 core/src/listener/data-feed.ts delete mode 100644 core/src/reporter/data-feed.ts delete mode 100644 core/src/worker/data-feed.ts delete mode 100644 core/src/worker/data-feed.utils.ts delete mode 100644 core/test/data-feed-worker.test.ts diff --git a/cli/src/adapter.ts b/cli/src/adapter.ts deleted file mode 100644 index d5116f11e..000000000 --- a/cli/src/adapter.ts +++ /dev/null @@ -1,129 +0,0 @@ -import axios from 'axios' -import { boolean as cmdboolean, command, flag, option, subcommands } from 'cmd-ts' -import { IAdapter, ReadFile } from './cli-types.js' -import { ORAKL_NETWORK_API_URL } from './settings.js' -import { buildUrl, idOption, isOraklNetworkApiHealthy } from './utils.js' - -const ADAPTER_ENDPOINT = buildUrl(ORAKL_NETWORK_API_URL, 'adapter') - -export function adapterSub() { - // adapter list - // adapter insert --source ${source} - // adapter remove --id ${id} - // adapter hash --source ${source} --verify - - const list = command({ - name: 'list', - args: {}, - handler: listHandler(true), - }) - - const insert = command({ - name: 'insert', - args: { - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: insertHandler(), - }) - - const remove = command({ - name: 'remove', - args: { - id: idOption, - }, - handler: removeHandler(), - }) - - const hash = command({ - name: 'hash', - args: { - verify: flag({ - type: cmdboolean, - long: 'verify', - }), - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: hashHandler(), - }) - - return subcommands({ - name: 'adapter', - cmds: { list, insert, remove, hash }, - }) -} - -export function listHandler(print?: boolean) { - async function wrapper() { - if (!(await isOraklNetworkApiHealthy())) return - - try { - const result = (await axios.get(ADAPTER_ENDPOINT)).data - if (print) { - console.dir(result, { depth: null }) - } - return result - } catch (e) { - console.dir(e?.response?.data, { depth: null }) - } - } - return wrapper -} - -export function insertHandler() { - async function wrapper({ data }: { data }) { - if (!(await isOraklNetworkApiHealthy())) return - - try { - const response = (await axios.post(ADAPTER_ENDPOINT, data)).data - console.dir(response, { depth: null }) - return response - } catch (e) { - console.error('Adapter was not inserted. Reason:') - console.error(e?.response?.data) - return e?.response?.data - } - } - return wrapper -} - -export function removeHandler() { - async function wrapper({ id }: { id: number }) { - if (!(await isOraklNetworkApiHealthy())) return - - try { - const endpoint = buildUrl(ADAPTER_ENDPOINT, id.toString()) - const response = (await axios.delete(endpoint)).data - console.dir(response, { depth: null }) - } catch (e) { - console.error('Adapter was not deleted. Reason:') - console.error(e?.response?.data?.message) - } - } - return wrapper -} - -export function hashHandler() { - async function wrapper({ data, verify }: { data; verify: boolean }) { - try { - const endpoint = buildUrl(ADAPTER_ENDPOINT, 'hash') - const adapter = data as IAdapter - const adapterWithCorrectHash = (await axios.post(endpoint, adapter, { params: { verify } })) - .data - console.dir(adapterWithCorrectHash, { depth: null }) - return adapterWithCorrectHash - } catch (e) { - console.error('Adapter hash could not be computed. Reason:') - const errMsg = e?.response?.data ? e.response.data : e.message - - console.error(errMsg) - return errMsg - } - } - return wrapper -} diff --git a/cli/src/aggregator.ts b/cli/src/aggregator.ts deleted file mode 100644 index 7c419cc43..000000000 --- a/cli/src/aggregator.ts +++ /dev/null @@ -1,319 +0,0 @@ -import axios from 'axios' -import { - boolean as cmdboolean, - command, - flag, - option, - string as cmdstring, - subcommands, -} from 'cmd-ts' -import { IAggregator, ReadFile } from './cli-types.js' -import { ORAKL_NETWORK_API_URL, WORKER_SERVICE_HOST, WORKER_SERVICE_PORT } from './settings.js' -import { - buildUrl, - chainOptionalOption, - fetcherTypeOptionalOption, - idOption, - isOraklNetworkApiHealthy, - isServiceHealthy, -} from './utils.js' - -const AGGREGATOR_ENDPOINT = buildUrl(ORAKL_NETWORK_API_URL, 'aggregator') - -export function aggregatorSub() { - // aggregator list [--active] [--chain ${chain}] - // aggregator insert --source ${source} --chain ${chain} - // aggregator remove --id ${id} - // aggregator hash --source ${source} --verify - // aggregator active --host ${host} --port ${port} - // aggregator activate --host ${host} --port ${port} --aggregatorHash ${aggregatorHash} - // aggregator deactivate --host ${host} --port ${port} --aggregatorHash ${aggregatorHash} - - const list = command({ - name: 'list', - args: { - active: flag({ - long: 'active', - }), - chain: chainOptionalOption, - }, - handler: listHandler(true), - }) - - const insert = command({ - name: 'insert', - args: { - data: option({ - type: ReadFile, - long: 'source', - }), - chain: option({ - type: cmdstring, - long: 'chain', - }), - fetcherType: fetcherTypeOptionalOption, - }, - handler: insertHandler(), - }) - - const remove = command({ - name: 'remove', - args: { - id: idOption, - }, - handler: removeHandler(), - }) - - const hash = command({ - name: 'hash', - args: { - verify: flag({ - type: cmdboolean, - long: 'verify', - }), - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: hashHandler(), - }) - - const active = command({ - name: 'active', - args: { - host: option({ - type: cmdstring, - long: 'host', - defaultValue: () => WORKER_SERVICE_HOST, - }), - port: option({ - type: cmdstring, - long: 'port', - defaultValue: () => String(WORKER_SERVICE_PORT), - }), - }, - handler: activeHandler(), - }) - - const activate = command({ - name: 'activate', - args: { - aggregatorHash: option({ - type: cmdstring, - long: 'aggregatorHash', - }), - host: option({ - type: cmdstring, - long: 'host', - defaultValue: () => WORKER_SERVICE_HOST, - }), - port: option({ - type: cmdstring, - long: 'port', - defaultValue: () => String(WORKER_SERVICE_PORT), - }), - }, - handler: activateHandler(), - }) - - const deactivate = command({ - name: 'deactivate', - - args: { - aggregatorHash: option({ - type: cmdstring, - long: 'aggregatorHash', - }), - host: option({ - type: cmdstring, - long: 'host', - defaultValue: () => WORKER_SERVICE_HOST, - }), - port: option({ - type: cmdstring, - long: 'port', - defaultValue: () => String(WORKER_SERVICE_PORT), - }), - }, - handler: deactivateHandler(), - }) - - return subcommands({ - name: 'aggregator', - cmds: { list, insert, remove, hash, active, activate, deactivate }, - }) -} - -export function listHandler(print?: boolean) { - async function wrapper({ chain, active }: { chain?: string; active?: boolean }) { - if (!(await isOraklNetworkApiHealthy())) return - - // cmd-ts does not allow to set boolean flag to undefined. It can - // be either true of false. When `active` is not set, we assume that - // user wants to see all aggregators. - if (!active) { - active = undefined - } - - try { - const url = new URL(AGGREGATOR_ENDPOINT) - if (active) { - url.searchParams.append('active', 'true') - } - if (chain) { - url.searchParams.append('chain', chain) - } - - const result = (await axios.get(url.toString())).data - if (print) { - console.dir(result, { depth: null }) - } - return result - } catch (e) { - console.dir(e?.response?.data, { depth: null }) - } - } - return wrapper -} - -export function insertHandler() { - async function wrapper({ - data, - chain, - fetcherType, - }: { - data - chain: string - fetcherType?: number - }) { - if (!(await isOraklNetworkApiHealthy())) return - - try { - const result = (await axios.post(AGGREGATOR_ENDPOINT, { ...data, chain, fetcherType })).data - console.dir(result, { depth: null }) - return result - } catch (e) { - console.error('Aggregator was not inserted. Reason:') - console.error(e?.response?.data) - return e?.response?.data - } - } - return wrapper -} - -export function removeHandler() { - async function wrapper({ id }: { id: number }) { - if (!(await isOraklNetworkApiHealthy())) return - - try { - const endpoint = buildUrl(AGGREGATOR_ENDPOINT, id.toString()) - const result = (await axios.delete(endpoint)).data - console.dir(result, { depth: null }) - } catch (e) { - console.error('Aggregator was not deleted. Reason:') - console.error(e?.response?.data?.message) - } - } - return wrapper -} - -export function hashHandler() { - async function wrapper({ data, verify }: { data; verify: boolean }) { - try { - const endpoint = buildUrl(AGGREGATOR_ENDPOINT, 'hash') - const aggregator = data as IAggregator - const aggregatorWithCorrectHash = ( - await axios.post(endpoint, aggregator, { - params: { verify }, - }) - ).data - console.dir(aggregatorWithCorrectHash, { depth: null }) - return aggregatorWithCorrectHash - } catch (e) { - console.error('Aggregator hash could not be computed. Reason:') - const errMsg = e?.response?.data ? e.response.data : e.message - - console.error(errMsg) - return errMsg - } - } - return wrapper -} - -export function activeHandler() { - async function wrapper({ host, port }: { host: string; port: string }) { - const aggregatorServiceEndpoint = `${host}:${port}` - if (!(await isServiceHealthy(aggregatorServiceEndpoint))) return - - const activeAggregatorEndpoint = buildUrl(aggregatorServiceEndpoint, 'active') - - try { - const result = (await axios.get(activeAggregatorEndpoint)).data - console.log(result) - } catch (e) { - console.error(e?.response?.data?.message) - } - } - return wrapper -} - -export function activateHandler() { - async function wrapper({ - host, - port, - aggregatorHash, - }: { - host: string - port: string - aggregatorHash: string - }) { - const aggregatorServiceEndpoint = `${host}:${port}` - if (!(await isServiceHealthy(aggregatorServiceEndpoint))) return - - const activateAggregatorEndpoint = buildUrl( - aggregatorServiceEndpoint, - `activate/${aggregatorHash}`, - ) - - try { - const result = (await axios.get(activateAggregatorEndpoint)).data - console.log(result?.message) - } catch (e) { - console.error('Aggregator was not activated. Reason:') - console.error(e?.response?.data?.message) - throw e - } - } - return wrapper -} - -export function deactivateHandler() { - async function wrapper({ - host, - port, - aggregatorHash, - }: { - host: string - port: string - aggregatorHash: string - }) { - const aggregatorServiceEndpoint = `${host}:${port}` - if (!(await isServiceHealthy(aggregatorServiceEndpoint))) return - - const deactivateAggregatorEndpoint = buildUrl( - aggregatorServiceEndpoint, - `deactivate/${aggregatorHash}`, - ) - - try { - const result = (await axios.get(deactivateAggregatorEndpoint)).data - console.log(result?.message) - } catch (e) { - console.error('Aggregator was not deactivated. Reason:') - console.error(e?.response?.data?.message) - throw e - } - } - return wrapper -} diff --git a/cli/src/datafeed.ts b/cli/src/datafeed.ts deleted file mode 100644 index 0f1f1fc53..000000000 --- a/cli/src/datafeed.ts +++ /dev/null @@ -1,494 +0,0 @@ -import { command, option, subcommands } from 'cmd-ts' -import { insertHandler as adapterInsertHandler } from './adapter.js' -import { - activateHandler as aggregatorActivateHandler, - deactivateHandler as aggregatorDeactivateHandler, - insertHandler as aggregatorInsertHandler, -} from './aggregator.js' -import { - IAdapter, - IAggregator, - IDatafeedBulk, - IDatafeedBulkInsertElement, - ReadFile, - readFileFromSource, -} from './cli-types.js' -import { - contractConnectHandler, - contractInsertHandler, - contractListHandler, - contractRemoveHandler, - functionInsertHandler, - functionListHandler, - functionRemoveHandler, - organizationListHandler, - reporterInsertHandler as delegatorReporterInsertHandler, - reporterListHandler as delegatorReporterListHandler, - reporterRemoveHandler as delegatorReporterRemoveHandler, -} from './delegator.js' -import { - startHandler as fetcherStartHandler, - stopHandler as fetcherStopHandler, -} from './fetcher.js' -import { - activateHandler as listenerActivateHandler, - deactivateHandler as listenerDeactivateHandler, - insertHandler as listenerInsertHandler, - listHandler as listenerListHandler, - removeHandler as listenerRemoveHandler, -} from './listener.js' -import { - activateHandler as reporterActivateHandler, - deactivateHandler as reporterDeactivateHandler, - insertHandler as reporterInsertHandler, - listHandler as reporterListHandler, - removeHandler as reporterRemoveHandler, -} from './reporter.js' -import { - FETCHER_HOST, - FETCHER_PORT, - LISTENER_SERVICE_HOST, - LISTENER_SERVICE_PORT, - REPORTER_SERVICE_HOST, - REPORTER_SERVICE_PORT, - WORKER_SERVICE_HOST, - WORKER_SERVICE_PORT, -} from './settings.js' -import { isValidUrl } from './utils.js' - -export function datafeedSub() { - // datafeed bulk-insert --source ${source} - // datafeed bulk-remove --source ${source} - // datafeed bulk-activate --source ${source} - // datafeed bulk-deactivate --source ${source} - - const insert = command({ - name: 'bulk-insert', - args: { - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: bulkInsertHandler(), - }) - - const remove = command({ - name: 'bulk-remove', - args: { - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: bulkRemoveHandler(), - }) - - const activate = command({ - name: 'bulk-activate', - args: { - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: bulkActivateHandler(), - }) - - const deactivate = command({ - name: 'bulk-deactivate', - args: { - data: option({ - type: ReadFile, - long: 'source', - }), - }, - handler: bulkDeactivateHandler(), - }) - - return subcommands({ - name: 'datafeed', - cmds: { insert, remove, activate, deactivate }, - }) -} - -export function bulkInsertHandler() { - async function wrapper({ data }: { data }) { - const bulkData = data as IDatafeedBulk - - const chain = bulkData?.chain || 'localhost' - const service = bulkData?.service || 'DATA_FEED' - const organization = bulkData?.organization || 'bisonai' - const functionName = bulkData?.functionName || 'submit(uint256,int256)' - const eventName = bulkData?.eventName || 'NewRound' - const organizationId = (await organizationListHandler()()).find( - (_organization) => _organization.name == organization, - ).id - - if (!checkBulkSource(data?.bulk)) { - console.error('invalid json src format') - return - } - - for (const insertElement of bulkData.bulk) { - console.log(`inserting ${insertElement}`) - const adapterData = await readFileFromSource(insertElement.adapterSource) - if (!checkAdapterSource(adapterData)) { - console.error(`invalid adapterData: ${adapterData}, skipping insert`) - continue - } - const aggregatorData = await readFileFromSource(insertElement.aggregatorSource) - if (!checkAggregatorSource(aggregatorData)) { - console.error(`invalid aggregatorData: ${aggregatorData}, skipping insert`) - continue - } - - await adapterInsertHandler()({ data: adapterData }) - await aggregatorInsertHandler()({ data: aggregatorData, chain }) - - const reporterInsertResult = await delegatorReporterInsertHandler()({ - address: insertElement.reporter.walletAddress, - organizationId: Number(organizationId), - }) - const contractInsertResult = await contractInsertHandler()({ - address: aggregatorData.address, - }) - - await functionInsertHandler()({ - name: functionName, - contractId: Number(contractInsertResult.id), - }) - await contractConnectHandler()({ - contractId: Number(contractInsertResult.id), - reporterId: Number(reporterInsertResult.id), - }) - await reporterInsertHandler()({ - chain, - service: service, - privateKey: insertElement.reporter.walletPrivateKey, - address: insertElement.reporter.walletAddress, - oracleAddress: aggregatorData.address, - }) - await listenerInsertHandler()({ - chain, - service: service, - address: aggregatorData.address, - eventName, - }) - } - } - return wrapper -} - -export function bulkRemoveHandler() { - async function wrapper({ data }: { data }) { - const bulkData = data as IDatafeedBulk - - if (!checkBulkSource(data?.bulk)) { - console.error('invalid json src format') - return - } - - const listeners = await listenerListHandler()({}) - const reporters = await reporterListHandler()({}) - const delegatorReporters = await delegatorReporterListHandler()() - const delegatorContracts = await contractListHandler()() - const delegatorFunctions = await functionListHandler()() - - for (const removeElement of bulkData.bulk) { - console.log(`removing ${removeElement}`) - const adapterData = await readFileFromSource(removeElement.adapterSource) - if (!checkAdapterSource(adapterData)) { - console.error(`invalid adapterData: ${adapterData}, skipping removal`) - continue - } - const aggregatorData = await readFileFromSource(removeElement.aggregatorSource) - if (!checkAggregatorSource(aggregatorData)) { - console.error(`invalid aggregatorData: ${aggregatorData}, skipping removal`) - continue - } - - const listenerId = listeners.find((listener) => listener.address == aggregatorData.address).id - const reporterId = reporters.find( - (reporter) => reporter.address == removeElement.reporter.walletAddress, - ).id - - const delegatorReporterId = delegatorReporters.find( - (reporter) => - reporter.address.toLowerCase() == removeElement.reporter.walletAddress.toLowerCase(), - ).id - const delegatorContractId = delegatorContracts.find( - (contract) => contract.address.toLowerCase() == aggregatorData.address.toLowerCase(), - ).id - const functionId = delegatorFunctions.find( - (_function) => _function.address.toLowerCase() == aggregatorData.address.toLowerCase(), - ).id - - await listenerRemoveHandler()({ id: listenerId }) - await reporterRemoveHandler()({ id: reporterId }) - - await functionRemoveHandler()({ id: functionId }) - await contractRemoveHandler()({ id: delegatorContractId }) - await delegatorReporterRemoveHandler()({ id: delegatorReporterId }) - - // not removing adapter and aggregator since it's impossible to remove those without wiping out related rows from data table - // and leaving out adapter and aggregator in the table won't have that much impact on db so it'll be as it is - } - } - return wrapper -} - -export function bulkActivateHandler() { - async function wrapper({ data }: { data }) { - const bulkData = data as IDatafeedBulk - - const chain = bulkData?.chain || 'localhost' - const service = bulkData?.service || 'DATA_FEED' - - const fetcherHost = bulkData?.fetcherHost || FETCHER_HOST - const workerHost = bulkData?.workerHost || WORKER_SERVICE_HOST - const listenerHost = bulkData?.listenerHost || LISTENER_SERVICE_HOST - const reporterHost = bulkData?.reporterHost || REPORTER_SERVICE_HOST - - const fetcherPort = bulkData?.fetcherPort || FETCHER_PORT - const workerPort = bulkData?.workerPort || WORKER_SERVICE_PORT - const listenerPort = bulkData?.listenerPort || LISTENER_SERVICE_PORT - const reporterPort = bulkData?.reporterPort || REPORTER_SERVICE_PORT - - const listeners = await listenerListHandler()({ chain, service }) - const reporters = await reporterListHandler()({ chain, service }) - - if (!checkBulkSource(data?.bulk)) { - console.error(`invalid json src format`) - return - } - - for (const activateElement of data.bulk) { - const aggregatorData = await readFileFromSource(activateElement.aggregatorSource) - if (!checkAggregatorSource(aggregatorData)) { - console.error(`invalid aggregatorData: ${aggregatorData}, skipping activation`) - continue - } - - const reporterId = reporters.find( - (reporter) => - reporter.address.toLowerCase() == activateElement.reporter.walletAddress.toLowerCase(), - )?.id - if (!reporterId) { - console.error( - `reporterId not found for ${activateElement.reporter.walletAddress}, skipping activation`, - ) - continue - } - - const listenerId = listeners.find( - (listener) => listener.address == aggregatorData.address, - )?.id - if (!listenerId) { - console.error(`listenerId not found for ${aggregatorData.address}, skipping activation`) - continue - } - - try { - await fetcherStartHandler()({ - id: aggregatorData.aggregatorHash, - chain, - host: fetcherHost, - port: fetcherPort, - }) - await aggregatorActivateHandler()({ - aggregatorHash: aggregatorData.aggregatorHash, - host: workerHost, - port: workerPort, - }) - - await reporterActivateHandler()({ - id: Number(reporterId), - host: reporterHost, - port: reporterPort, - }) - await listenerActivateHandler()({ - id: Number(listenerId), - host: listenerHost, - port: listenerPort, - }) - } catch (e) { - console.error( - `activation failed for ${activateElement.aggregatorSource}, breaking iteration`, - ) - console.error(e?.response?.data) - break - } - } - } - return wrapper -} - -export function bulkDeactivateHandler() { - async function wrapper({ data }: { data }) { - const bulkData = data as IDatafeedBulk - - const chain = bulkData?.chain || 'localhost' - const service = bulkData?.service || 'DATA_FEED' - - const fetcherHost = bulkData?.fetcherHost || FETCHER_HOST - const workerHost = bulkData?.workerHost || WORKER_SERVICE_HOST - const listenerHost = bulkData?.listenerHost || LISTENER_SERVICE_HOST - const reporterHost = bulkData?.reporterHost || REPORTER_SERVICE_HOST - - const fetcherPort = bulkData?.fetcherPort || FETCHER_PORT - const workerPort = bulkData?.workerPort || WORKER_SERVICE_PORT - const listenerPort = bulkData?.listenerPort || LISTENER_SERVICE_PORT - const reporterPort = bulkData?.reporterPort || REPORTER_SERVICE_PORT - - const listeners = await listenerListHandler()({ chain, service }) - const reporters = await reporterListHandler()({ chain, service }) - - if (!checkBulkSource(data?.bulk)) { - console.error(`invalid json src format`) - return - } - - for (const deactivateElement of data.bulk) { - const aggregatorData = await readFileFromSource(deactivateElement.aggregatorSource) - if (!checkAggregatorSource(aggregatorData)) { - console.error(`invalid aggregatorData: ${aggregatorData}, skipping deactivation`) - continue - } - - const reporterId = reporters.find( - (reporter) => - reporter.address.toLowerCase() == deactivateElement.reporter.walletAddress.toLowerCase(), - )?.id - if (!reporterId) { - console.error( - `reporterId not found for ${deactivateElement.reporter.walletAddress}, skipping deactivation`, - ) - continue - } - - const listenerId = listeners.find( - (listener) => listener.address == aggregatorData.address, - )?.id - if (!listenerId) { - console.error(`listenerId not found for ${aggregatorData.address}, skipping deactivation`) - continue - } - - try { - await listenerDeactivateHandler()({ - id: Number(listenerId), - host: listenerHost, - port: listenerPort, - }) - - await reporterDeactivateHandler()({ - id: Number(reporterId), - host: reporterHost, - port: reporterPort, - }) - - await aggregatorDeactivateHandler()({ - aggregatorHash: aggregatorData.aggregatorHash, - host: workerHost, - port: workerPort, - }) - - await fetcherStopHandler()({ - id: aggregatorData.aggregatorHash, - chain, - host: fetcherHost, - port: fetcherPort, - }) - } catch (e) { - console.error( - `deactivation failed for ${deactivateElement.aggregatorSource}, breaking iteration`, - ) - console.error(e?.response?.data) - break - } - } - } - return wrapper -} - -function checkBulkSource(bulkData: IDatafeedBulkInsertElement[]) { - if (!bulkData || bulkData.length == 0) { - console.error('empty bulk insert data') - return false - } - for (const insertElement of bulkData) { - if (!isValidUrl(insertElement.adapterSource)) { - console.error(`${insertElement.adapterSource} is invalid url`) - return false - } - if (!isValidUrl(insertElement.aggregatorSource)) { - console.error(`${insertElement.aggregatorSource} is invalid url`) - } - if ( - !insertElement.reporter || - !insertElement.reporter.walletAddress || - !insertElement.reporter.walletPrivateKey - ) { - console.error(`${insertElement.reporter} is missing values`) - return false - } - } - return true -} - -function checkAdapterSource(adapterData: IAdapter) { - if (!adapterData.adapterHash) { - console.error(`adapterHash is empty`) - return false - } - if (!adapterData.name) { - console.error(`adapter name is empty`) - return false - } - if (!adapterData.decimals) { - console.error(`adapter decimals is empty`) - return false - } - if (!adapterData.feeds) { - console.error(`adapter feeds is empty`) - return false - } - return true -} - -function checkAggregatorSource(aggregatorData: IAggregator) { - if (!aggregatorData.aggregatorHash) { - console.error(`aggregatorHash is empty`) - return false - } - if (aggregatorData.active) { - console.error(`can't insert active aggregator`) - return false - } - if (!aggregatorData.name) { - console.error(`aggregatorData name is empty`) - return false - } - if (!aggregatorData.address) { - console.error(`aggregator address is empty`) - return false - } - if (!aggregatorData.heartbeat) { - console.error(`aggregator heartbeat is empty`) - return false - } - if (!aggregatorData.threshold) { - console.error(`aggregator threshold is empty`) - return false - } - if (!aggregatorData.absoluteThreshold) { - console.error(`aggregator absoluteThreshold is empty`) - return false - } - if (!aggregatorData.adapterHash) { - console.error(`aggregator adapterHash is empty`) - return false - } - return true -} diff --git a/cli/src/fetcher.ts b/cli/src/fetcher.ts deleted file mode 100644 index 63ffb7e96..000000000 --- a/cli/src/fetcher.ts +++ /dev/null @@ -1,152 +0,0 @@ -import axios from 'axios' -import { command, option, string as cmdstring, subcommands } from 'cmd-ts' -import { FETCHER_API_VERSION, FETCHER_HOST, FETCHER_PORT } from './settings.js' -import { buildFetcherUrl, buildUrl, isOraklFetcherHealthy } from './utils.js' - -export function fetcherSub() { - // fetcher active --host ${host} --port ${port} - // fetcher start --id ${aggregatorHash} --chain ${chain} --host ${host} --port ${port} - // fetcher stop --id ${aggregatorHash} --chain ${chain} --host ${host} --port ${port} - - const active = command({ - name: 'active', - args: { - host: option({ - type: cmdstring, - long: 'host', - defaultValue: () => FETCHER_HOST, - }), - port: option({ - type: cmdstring, - long: 'port', - defaultValue: () => String(FETCHER_PORT), - }), - }, - handler: activeHandler(), - }) - - const start = command({ - name: 'start', - args: { - id: option({ - type: cmdstring, - long: 'id', - }), - chain: option({ - type: cmdstring, - long: 'chain', - }), - host: option({ - type: cmdstring, - long: 'host', - defaultValue: () => FETCHER_HOST, - }), - port: option({ - type: cmdstring, - long: 'port', - defaultValue: () => String(FETCHER_PORT), - }), - }, - handler: startHandler(), - }) - - const stop = command({ - name: 'stop', - args: { - id: option({ - type: cmdstring, - long: 'id', - }), - chain: option({ - type: cmdstring, - long: 'chain', - }), - host: option({ - type: cmdstring, - long: 'host', - defaultValue: () => FETCHER_HOST, - }), - port: option({ - type: cmdstring, - long: 'port', - defaultValue: () => String(FETCHER_PORT), - }), - }, - handler: stopHandler(), - }) - - return subcommands({ - name: 'fetcher', - cmds: { active, start, stop }, - }) -} - -export function activeHandler() { - async function wrapper({ host, port }: { host: string; port: string }) { - const fetcherEndpoint = buildFetcherUrl(host, port, FETCHER_API_VERSION) - if (!(await isOraklFetcherHealthy(fetcherEndpoint))) return - - try { - const activeFetcherEndpoint = buildUrl(fetcherEndpoint, 'active') - const result = (await axios.get(activeFetcherEndpoint)).data - console.log(result) - } catch (e) { - console.error(e?.response?.data?.message) - } - } - return wrapper -} - -export function startHandler() { - async function wrapper({ - id, - chain, - host, - port, - }: { - id: string - chain: string - host: string - port: string - }) { - const fetcherEndpoint = buildFetcherUrl(host, port, FETCHER_API_VERSION) - if (!(await isOraklFetcherHealthy(fetcherEndpoint))) return - - try { - const endpoint = buildUrl(fetcherEndpoint, `start/${id}`) - const result = (await axios.get(endpoint, { data: { chain } })).data - console.log(result) - } catch (e) { - console.error(e?.response?.data, { depth: null }) - throw e - } - } - return wrapper -} - -export function stopHandler() { - async function wrapper({ - id, - chain, - host, - port, - }: { - id: string - chain: string - host: string - port: string - }) { - const fetcherEndpoint = buildFetcherUrl(host, port, FETCHER_API_VERSION) - if (!(await isOraklFetcherHealthy(fetcherEndpoint))) return - - try { - const endpoint = buildUrl(fetcherEndpoint, `stop/${id}`) - const result = (await axios.get(endpoint, { data: { chain } })).data - console.log(result) - } catch (e) { - console.error(e?.response?.data?.message) - throw e - } - } - return wrapper -} diff --git a/cli/src/index.ts b/cli/src/index.ts index 418ace9e9..d2a87d225 100644 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -1,12 +1,8 @@ #!/usr/bin/env node --no-warnings import { binary, command, run, subcommands } from 'cmd-ts' -import { adapterSub } from './adapter.js' -import { aggregatorSub } from './aggregator.js' import { chainSub } from './chain.js' -import { datafeedSub } from './datafeed.js' import { delegatorSub } from './delegator.js' -import { fetcherSub } from './fetcher.js' import { listenerSub } from './listener.js' import { proxySub } from './proxy.js' import { reporterSub } from './reporter.js' @@ -18,13 +14,9 @@ async function main() { const service = serviceSub() const listener = listenerSub() const vrf = vrfSub() - const adapter = adapterSub() - const aggregator = aggregatorSub() - const fetcher = fetcherSub() const reporter = reporterSub() const delegator = delegatorSub() const proxy = proxySub() - const datafeed = datafeedSub() const version = command({ name: 'version', @@ -41,14 +33,10 @@ async function main() { service, listener, vrf, - adapter, - aggregator, - fetcher, reporter, version, delegator, proxy, - datafeed, }, }) diff --git a/cli/src/settings.ts b/cli/src/settings.ts index 8d76649f8..4163ccc4e 100644 --- a/cli/src/settings.ts +++ b/cli/src/settings.ts @@ -2,27 +2,23 @@ const production = process.env.NODE_ENV == 'production' const default_api_url = production ? 'http://orakl-api.orakl.svc.cluster.local' : 'http://localhost:3000/api/v1' + const default_delegator_url = production ? 'http://orakl-delegator.orakl.svc.cluster.local' : 'http://localhost:3002/api/v1' -const default_fetcher_host = production - ? 'http://orakl-fetcher.orakl.svc.cluster.local' - : 'http://localhost' -const default_fetcher_port = production ? '4040' : '3001' - const default_listener_host = production - ? 'http://aggregator-listener.orakl.svc.cluster.local' + ? 'http://vrf-listener.orakl.svc.cluster.local' : 'http://localhost' const default_listener_port = production ? '4000' : '4000' const default_worker_host = production - ? 'http://aggregator-worker.orakl.svc.cluster.local' + ? 'http://vrf-worker.orakl.svc.cluster.local' : 'http://localhost' const default_worker_port = production ? '5000' : '5001' const default_reporter_host = production - ? 'http://aggregator-reporter.orakl.svc.cluster.local' + ? 'http://vrf-reporter.orakl.svc.cluster.local' : 'http://localhost' const default_reporter_port = production ? '6000' : '6000' @@ -30,10 +26,6 @@ export const ORAKL_NETWORK_API_URL = process.env.ORAKL_NETWORK_API_URL || defaul export const ORAKL_NETWORK_DELEGATOR_URL = process.env.ORAKL_NETWORK_DELEGATOR_URL || default_delegator_url -export const FETCHER_HOST = process.env.FETCHER_HOST || default_fetcher_host -export const FETCHER_PORT = process.env.FETCHER_PORT || default_fetcher_port -export const FETCHER_API_VERSION = '/api/v1' - export const LISTENER_SERVICE_HOST = process.env.LISTENER_SERVICE_HOST || default_listener_host export const LISTENER_SERVICE_PORT = process.env.LISTENER_SERVICE_PORT || default_listener_port diff --git a/cli/src/utils.ts b/cli/src/utils.ts index dd18b4ba7..65d6c9993 100644 --- a/cli/src/utils.ts +++ b/cli/src/utils.ts @@ -64,28 +64,6 @@ export async function isOraklNetworkApiHealthy() { } } -export async function isOraklFetcherHealthy(url: string) { - if (!(await isValidUrl(url))) { - console.error('Invalid URL') - return false - } - - try { - const response = await axios.get(url) - if (response.status === 200) { - return true - } else { - console.error(`Orakl Network Fetcher [${url}] is down. HTTP Status: ${response.status}`) - return false - } - } catch (error) { - console.error( - `An error occurred while checking the Orakl Network Fetcher [${url}]: ${error.message}`, - ) - return false - } -} - export async function isOraklDelegatorHealthy() { try { return 200 === (await axios.get(ORAKL_NETWORK_DELEGATOR_URL))?.status diff --git a/cli/test/adapter.ts b/cli/test/adapter.ts deleted file mode 100644 index b7a916739..000000000 --- a/cli/test/adapter.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { describe, expect, test } from '@jest/globals' -import { insertHandler, listHandler, removeHandler } from '../src/adapter' -import { ADAPTER_0, ADAPTER_1 } from './mockData' - -describe('CLI Adapter', function () { - let initalAdapterId - beforeAll(async () => { - // insert default adapter - const insertResult = await insertHandler()({ data: ADAPTER_0 }) - initalAdapterId = insertResult.id - }) - - afterAll(async () => { - const adapters = await listHandler()() - for (const adapter of adapters) { - await removeHandler()({ id: adapter.id }) - } - }) - - test('Should list Adapters', async function () { - const adapter = await listHandler()() - expect(adapter.length).toBeGreaterThan(0) - }) - - test('Should insert new adapter', async function () { - const adapterBefore = await listHandler()() - const result = await insertHandler()({ data: ADAPTER_1 }) - const adapterAfter = await listHandler()() - expect(adapterAfter.length).toEqual(adapterBefore.length + 1) - await removeHandler()({ id: result.id }) - }) - - test('Should not allow to insert the same adapter more than once', async function () { - await insertHandler()({ data: ADAPTER_1 }) - const msg = await insertHandler()({ data: ADAPTER_1 }) - expect(msg).toEqual( - 'ERROR: duplicate key value violates unique constraint "adapters_adapter_hash_key" (SQLSTATE 23505)', - ) - }) - - test('Should delete adapter based on id', async function () { - const adapterBefore = await listHandler()() - await removeHandler()({ id: initalAdapterId }) - const adapterAfter = await listHandler()() - expect(adapterAfter.length).toEqual(adapterBefore.length - 1) - }) -}) diff --git a/cli/test/aggregator.ts b/cli/test/aggregator.ts deleted file mode 100644 index ec42a8322..000000000 --- a/cli/test/aggregator.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { describe, expect, test } from '@jest/globals' -import { - insertHandler as adapterInsertHandler, - listHandler as adapterListHandler, - removeHandler as adapterRemoveHandler, -} from '../src/adapter' -import { insertHandler, listHandler, removeHandler } from '../src/aggregator' -import { - insertHandler as chainInsertHandler, - listHandler as chainListHandler, - removeHandler as chainRemoveHandler, -} from '../src/chain' -import { ADAPTER_0, ADAPTER_1, AGGREGATOR_0, AGGREGATOR_1 } from './mockData' - -describe('CLI Aggregator', function () { - let initialAggregatorId - beforeAll(async () => { - await chainInsertHandler()({ name: 'localhost' }) - await adapterInsertHandler()({ data: ADAPTER_0 }) - await adapterInsertHandler()({ data: ADAPTER_1 }) - - const insertResult = await insertHandler()({ data: AGGREGATOR_0, chain: 'localhost' }) - initialAggregatorId = insertResult.id - }) - - afterAll(async () => { - const aggregators = await listHandler()({}) - for (const aggregator of aggregators) { - await removeHandler()({ id: aggregator.id }) - } - const adapters = await adapterListHandler()() - for (const adapter of adapters) { - await adapterRemoveHandler()({ id: adapter.id }) - } - const chains = await chainListHandler()() - for (const chain of chains) { - await chainRemoveHandler()({ id: chain.id }) - } - }) - - test('Should list Aggregators', async function () { - const aggregator = await listHandler()({}) - expect(aggregator.length).toBeGreaterThan(0) - }) - - test('Should insert new aggregator', async function () { - const aggregatorBefore = await listHandler()({}) - const result = await insertHandler()({ data: AGGREGATOR_1, chain: 'localhost' }) - const aggregatorAfter = await listHandler()({}) - expect(aggregatorAfter.length).toEqual(aggregatorBefore.length + 1) - await removeHandler()({ id: result.id }) - }) - - test('Should not allow to insert the same aggregator more than once', async function () { - await insertHandler()({ data: AGGREGATOR_1, chain: 'localhost' }) - - const msg = await insertHandler()({ data: AGGREGATOR_1, chain: 'localhost' }) - expect(msg).toEqual( - 'ERROR: duplicate key value violates unique constraint "aggregators_address_key" (SQLSTATE 23505)', - ) - }) - - test('Should delete aggregator based on id', async function () { - const aggregatorBefore = await listHandler()({}) - await removeHandler()({ id: initialAggregatorId }) - const aggregatorAfter = await listHandler()({}) - expect(aggregatorAfter.length).toEqual(aggregatorBefore.length - 1) - }) -}) diff --git a/core/package.json b/core/package.json index acd8b8cfb..19f9889a4 100644 --- a/core/package.json +++ b/core/package.json @@ -36,14 +36,12 @@ "start:listener": "node --no-warnings --import=specifier-resolution-node/register --experimental-json-modules dist/listener/main.js", "start:listener:vrf": "yarn start:listener --service VRF", "start:listener:request_response": "yarn start:listener --service REQUEST_RESPONSE", - "start:listener:data_feed": "yarn start:listener --service DATA_FEED", "start:listener:data_feed_l2": "yarn start:listener --service DATA_FEED_L2", "start:listener:vrf_l2_request": "yarn start:listener --service VRF_L2_REQUEST", "start:listener:vrf_l2_fulfill": "yarn start:listener --service VRF_L2_FULFILL", "start:listener:rr_l2_request": "yarn start:listener --service REQUEST_RESPONSE_L2_REQUEST", "start:listener:rr_l2_fulfill": "yarn start:listener --service REQUEST_RESPONSE_L2_FULFILL", "start:worker": "node --no-warnings --import=specifier-resolution-node/register --experimental-json-modules dist/worker/main.js", - "start:worker:data_feed": "yarn start:worker --worker DATA_FEED", "start:worker:data_feed_l2": "yarn start:worker --worker DATA_FEED_L2", "start:worker:vrf": "yarn start:worker --worker VRF", "start:worker:vrf_l2_request": "yarn start:worker --worker VRF_L2_REQUEST", @@ -54,7 +52,6 @@ "start:reporter": "node --no-warnings --import=specifier-resolution-node/register --experimental-json-modules dist/reporter/main.js", "start:reporter:vrf": "yarn start:reporter --reporter VRF", "start:reporter:request_response": "yarn start:reporter --reporter REQUEST_RESPONSE", - "start:reporter:data_feed": "yarn start:reporter --reporter DATA_FEED", "start:por": "node --no-warnings --import=specifier-resolution-node/register --experimental-json-modules dist/por/main.js", "start:reporter:data_feed_l2": "yarn start:reporter --reporter DATA_FEED_L2", "start:reporter:vrf_l2_request": "yarn start:reporter --reporter VRF_L2_REQUEST", diff --git a/core/src/listener/data-feed.ts b/core/src/listener/data-feed.ts deleted file mode 100644 index 28f1de78d..000000000 --- a/core/src/listener/data-feed.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { Aggregator__factory } from '@bisonai/orakl-contracts/v0.1' -import { Queue } from 'bullmq' -import { ethers } from 'ethers' -import { Logger } from 'pino' -import type { RedisClientType } from 'redis' -import { getOperatorAddress } from '../api' -import { - AGGREGATOR_QUEUE_SETTINGS, - BULLMQ_CONNECTION, - CHAIN, - DATA_FEED_LISTENER_STATE_NAME, - DATA_FEED_SERVICE_NAME, - DEPLOYMENT_NAME, - LISTENER_DATA_FEED_HISTORY_QUEUE_NAME, - LISTENER_DATA_FEED_LATEST_QUEUE_NAME, - LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME, - WORKER_AGGREGATOR_QUEUE_NAME, -} from '../settings' -import { IDataFeedListenerWorker, IListenerConfig, INewRound } from '../types' -import { buildSubmissionRoundJobId } from '../utils' -import { listenerService } from './listener' -import { ProcessEventOutputType } from './types' - -const FILE_NAME = import.meta.url - -export async function buildListener( - config: IListenerConfig[], - redisClient: RedisClientType, - logger: Logger, -) { - const stateName = DATA_FEED_LISTENER_STATE_NAME - const service = DATA_FEED_SERVICE_NAME - const chain = CHAIN - const eventName = 'NewRound' - const latestQueueName = LISTENER_DATA_FEED_LATEST_QUEUE_NAME - const historyQueueName = LISTENER_DATA_FEED_HISTORY_QUEUE_NAME - const processEventQueueName = LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME - const workerQueueName = WORKER_AGGREGATOR_QUEUE_NAME - const abi = Aggregator__factory.abi - const iface = new ethers.utils.Interface(abi) - - const latestListenerQueue = new Queue(latestQueueName, BULLMQ_CONNECTION) - const historyListenerQueue = new Queue(historyQueueName, BULLMQ_CONNECTION) - const processEventQueue = new Queue(processEventQueueName, BULLMQ_CONNECTION) - - await redisClient.set(stateName, JSON.stringify([])) - await latestListenerQueue.obliterate({ force: true }) - await historyListenerQueue.obliterate({ force: true }) - await processEventQueue.obliterate({ force: true }) - - listenerService({ - config, - abi, - stateName, - service, - chain, - eventName, - latestQueueName, - historyQueueName, - processEventQueueName, - workerQueueName, - processFn: await processEvent({ iface, logger }), - redisClient, - listenerInitType: 'latest', - logger, - }) -} - -async function processEvent({ iface, logger }: { iface: ethers.utils.Interface; logger: Logger }) { - const _logger = logger.child({ name: 'processEvent', file: FILE_NAME }) - - async function wrapper(log): Promise { - const eventData = iface.parseLog(log).args as unknown as INewRound - _logger.debug(eventData, 'eventData') - - const oracleAddress = log.address - const roundId = eventData.roundId.toNumber() - const operatorAddress = await getOperatorAddress({ oracleAddress, logger: _logger }) - - if (eventData.startedBy == operatorAddress) { - _logger.debug(`Ignore event emitted by ${eventData.startedBy} for round ${roundId}`) - } else { - // NewRound emitted by somebody else - const jobName = 'event' - - const jobId = buildSubmissionRoundJobId({ - oracleAddress, - roundId, - deploymentName: DEPLOYMENT_NAME, - }) - const jobData: IDataFeedListenerWorker = { - oracleAddress, - roundId, - workerSource: 'event', - } - _logger.debug(jobData, 'jobData') - - return { jobName, jobId, jobData, jobQueueSettings: AGGREGATOR_QUEUE_SETTINGS } - } - } - - return wrapper -} diff --git a/core/src/reporter/data-feed.ts b/core/src/reporter/data-feed.ts deleted file mode 100644 index a196cea87..000000000 --- a/core/src/reporter/data-feed.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { Queue } from 'bullmq' -import { Logger } from 'pino' -import type { RedisClientType } from 'redis' -import { - BAOBAB_CHAIN_ID, - BULLMQ_CONNECTION, - CYPRESS_CHAIN_ID, - DATA_FEED_REPORTER_CONCURRENCY, - DATA_FEED_REPORTER_STATE_NAME, - DATA_FEED_SERVICE_NAME, - PROVIDER, - REPORTER_AGGREGATOR_QUEUE_NAME, -} from '../settings' -import { factory } from './factory' - -export async function buildReporter(redisClient: RedisClientType, logger: Logger) { - const chainId = (await PROVIDER.getNetwork()).chainId - - const reporterAggregateQueue = new Queue(REPORTER_AGGREGATOR_QUEUE_NAME, BULLMQ_CONNECTION) - await reporterAggregateQueue.obliterate({ force: true }) - - await factory({ - redisClient, - stateName: DATA_FEED_REPORTER_STATE_NAME, - nonceManagerQueueName: 'NONCE_MANAGER_DATA_FEED_QUEUE_NAME', - service: DATA_FEED_SERVICE_NAME, - reporterQueueName: REPORTER_AGGREGATOR_QUEUE_NAME, - concurrency: DATA_FEED_REPORTER_CONCURRENCY, - delegatedFee: [BAOBAB_CHAIN_ID, CYPRESS_CHAIN_ID].includes(chainId) ? true : false, - _logger: logger, - }) -} diff --git a/core/src/worker/data-feed.ts b/core/src/worker/data-feed.ts deleted file mode 100644 index 93ac0d302..000000000 --- a/core/src/worker/data-feed.ts +++ /dev/null @@ -1,584 +0,0 @@ -import { Aggregator__factory } from '@bisonai/orakl-contracts/v0.1' -import { Job, Queue, Worker } from 'bullmq' -import { ethers } from 'ethers' -import { Logger } from 'pino' -import type { RedisClientType } from 'redis' -import { getOperatorAddress } from '../api' -import { OraklError, OraklErrorCode } from '../errors' -import { - AGGREGATOR_QUEUE_SETTINGS, - BULLMQ_CONNECTION, - CHAIN, - CHECK_HEARTBEAT_QUEUE_SETTINGS, - DATA_FEED_FULFILL_GAS_MINIMUM, - DATA_FEED_WORKER_STATE_NAME, - DEPLOYMENT_NAME, - HEARTBEAT_JOB_NAME, - HEARTBEAT_QUEUE_NAME, - HEARTBEAT_QUEUE_SETTINGS, - MAX_DATA_STALENESS, - PROVIDER, - REMOVE_ON_COMPLETE, - REPORTER_AGGREGATOR_QUEUE_NAME, - SUBMIT_HEARTBEAT_QUEUE_NAME, - SUBMIT_HEARTBEAT_QUEUE_SETTINGS, - WORKER_AGGREGATOR_QUEUE_NAME, - WORKER_CHECK_HEARTBEAT_QUEUE_NAME, - WORKER_DEVIATION_QUEUE_NAME, -} from '../settings' -import { - IAggregatorHeartbeatWorker, - IAggregatorSubmitHeartbeatWorker, - IDataFeedListenerWorker, - QueueType, -} from '../types' -import { buildHeartbeatJobId, buildSubmissionRoundJobId } from '../utils' -import { fetchDataFeedByAggregatorId, getAggregatorGivenAddress, getAggregators } from './api' -import { buildTransaction, isStale, oracleRoundStateCall } from './data-feed.utils' -import { State } from './state' -import { IDeviationData } from './types' -import { watchman } from './watchman' - -const FILE_NAME = import.meta.url - -/** - * Get all active aggregators, create their initial jobs, and submit - * them to the [heartbeat] queue. Launch [aggregator] and [heartbeat] - * workers. - * - * @param {RedisClientType} redis client - * @param {Logger} pino logger - */ -export async function worker(redisClient: RedisClientType, _logger: Logger) { - const logger = _logger.child({ name: 'worker', file: FILE_NAME }) - - // Queues - const aggregatorQueue = new Queue(WORKER_AGGREGATOR_QUEUE_NAME, BULLMQ_CONNECTION) - const heartbeatQueue = new Queue(HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION) - const submitHeartbeatQueue = new Queue(SUBMIT_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION) - const reporterQueue = new Queue(REPORTER_AGGREGATOR_QUEUE_NAME, BULLMQ_CONNECTION) - const checkHeartbeatQueue = new Queue(WORKER_CHECK_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION) - - // Clear queues - await aggregatorQueue.obliterate({ force: true }) - await heartbeatQueue.obliterate({ force: true }) - await submitHeartbeatQueue.obliterate({ force: true }) - await checkHeartbeatQueue.obliterate({ force: true }) - - // Clear previous jobs from repeatable [checkHeartbeat] queue - const repeatableJobs = await checkHeartbeatQueue.getRepeatableJobs() - for (const job of repeatableJobs) { - await checkHeartbeatQueue.removeRepeatableByKey(job.key) - } - - const state = new State({ - redisClient, - stateName: DATA_FEED_WORKER_STATE_NAME, - heartbeatQueue, - submitHeartbeatQueue, - chain: CHAIN, - logger, - }) - await state.clear() - - const activeAggregators = await getAggregators({ chain: CHAIN, active: true, logger }) - if (activeAggregators.length == 0) { - logger.warn('No active aggregators') - } - - // Launch all active aggregators - for (const aggregator of activeAggregators) { - await state.add(aggregator.aggregatorHash) - } - - // [aggregator] worker - const aggregatorWorker = new Worker( - WORKER_AGGREGATOR_QUEUE_NAME, - aggregatorJob(submitHeartbeatQueue, reporterQueue, state, _logger), - { - ...BULLMQ_CONNECTION, - }, - ) - aggregatorWorker.on('error', (e) => { - logger.error(e) - }) - - // [heartbeat] worker - const heartbeatWorker = new Worker( - HEARTBEAT_QUEUE_NAME, - heartbeatJob(aggregatorQueue, state, _logger), - BULLMQ_CONNECTION, - ) - heartbeatWorker.on('error', (e) => { - logger.error(e) - }) - - // [submitHeartbeat] worker - const submitHeartbeatWorker = new Worker( - SUBMIT_HEARTBEAT_QUEUE_NAME, - submitHeartbeatJob(heartbeatQueue, state, _logger), - BULLMQ_CONNECTION, - ) - submitHeartbeatWorker.on('error', (e) => { - logger.error(e) - }) - - // [checkHeartbeat] worker - const checkHeartbeatWorker = new Worker( - WORKER_CHECK_HEARTBEAT_QUEUE_NAME, - checkHeartbeatJob(submitHeartbeatQueue, state, _logger), - BULLMQ_CONNECTION, - ) - checkHeartbeatWorker.on('error', (e) => { - logger.error(e) - }) - checkHeartbeatQueue.add('livecheck', {}, CHECK_HEARTBEAT_QUEUE_SETTINGS) - - //[deviation] worker - const deviationWorker = new Worker( - WORKER_DEVIATION_QUEUE_NAME, - deviationJob(reporterQueue, _logger), - BULLMQ_CONNECTION, - ) - deviationWorker.on('error', (e) => { - logger.error(e, 'deviation error') - }) - - const watchmanServer = await watchman({ state, logger }) - - async function handleExit() { - logger.info('Exiting. Wait for graceful shutdown.') - - await redisClient.quit() - await aggregatorWorker.close() - await heartbeatWorker.close() - await submitHeartbeatWorker.close() - await checkHeartbeatWorker.close() - await watchmanServer.close() - await deviationWorker.close() - } - process.on('SIGINT', handleExit) - process.on('SIGTERM', handleExit) -} - -/** - * [aggregator] worker receives both [event] and [heartbeat] - * jobs. {event} jobs are created by listener. {heartbeat} jobs are - * either created during a launch of a worker, or inside of a reporter. - * - * Worker accepts job, parses the request, fetches the latest - * aggregated data from the Orakl Network API for a specific - * aggregator, and communicated with Aggregator smart contract to find - * out the which round ID, it can submit the latest value. Then, it - * create a new job and passes it to reporter worker. - * - * @param {QueueType} submit heartbeat queue - * @param {QueueType} reporter queue - * @param {Logger} pino logger - * @return {} [aggregator] job processor - */ -export function aggregatorJob( - submitHeartbeatQueue: QueueType, - reporterQueue: QueueType, - state: State, - _logger: Logger, -) { - const logger = _logger.child({ name: 'aggregatorJob' }) - const iface = new ethers.utils.Interface(Aggregator__factory.abi) - - async function wrapper(job: Job) { - const inData: IDataFeedListenerWorker = job.data - logger.debug(inData, 'inData') - const { oracleAddress, roundId, workerSource } = inData - - if (!state.isActive({ oracleAddress })) { - logger.warn(`aggregatorJob for oracle ${oracleAddress} is no longer active. Removing job.`) - return - } - - try { - // TODO store in ephemeral state - const { id: aggregatorId, heartbeat: delay } = await getAggregatorGivenAddress({ - oracleAddress, - logger, - }) - - const { timestamp, value: submission } = await fetchDataFeedByAggregatorId({ - aggregatorId, - logger, - }) - logger.debug( - { aggregatorId, fetchedAt: timestamp, submission }, - `Latest data aggregate by aggregatorId`, - ) - - // Submit heartbeat - const outDataSubmitHeartbeat: IAggregatorSubmitHeartbeatWorker = { - oracleAddress, - delay, - } - logger.debug(outDataSubmitHeartbeat, 'outDataSubmitHeartbeat') - await submitHeartbeatQueue.add('aggregator-submission', outDataSubmitHeartbeat, { - ...SUBMIT_HEARTBEAT_QUEUE_SETTINGS, - }) - - if (!submission || isStale({ timestamp, logger })) { - logger.warn(`Data became stale (> ${MAX_DATA_STALENESS}). Not reporting.`) - } else { - const tx = buildTransaction({ - payloadParameters: { - roundId, - submission, - }, - to: oracleAddress, - gasMinimum: DATA_FEED_FULFILL_GAS_MINIMUM, - iface, - logger, - }) - logger.debug(tx, 'tx') - - await reporterQueue.add(workerSource, tx, { - jobId: buildSubmissionRoundJobId({ - oracleAddress, - roundId, - deploymentName: DEPLOYMENT_NAME, - }), - removeOnComplete: REMOVE_ON_COMPLETE, - // Reporter job can fail, and should be either retried or - // removed. We need to remove the job after repeated failure - // to prevent deadlock when running with a single node operator. - // After removing the job on failure, we can resubmit the job - // with the same unique ID representing the submission for - // specific aggregator on specific round. - // - // FIXME Rethink! - removeOnFail: true, - }) - - return tx - } - } catch (e) { - // `FailedToFetchFromDataFeed` exception can be raised from `prepareDataForReporter`. - // `aggregatorJob` is being triggered by either `fixed` or `event` worker. - // `event` job will not be resubmitted. `fixed` job might be - // resubmitted, however due to the nature of fixed job cycle, the - // resubmission might be delayed more than is acceptable. For this - // reason jobs processed with `aggregatorJob` job must be retried with - // appropriate logic. - logger.error(e) - throw e - } - } - - return wrapper -} - -/** - * [heartbeat] worker receives job either from the launch of the Data - * Feed Worker service, or from the Data Feed Reporter service. In - * both cases heartbeat job is delayed by the `heartbeat` amount of - * time specified in milliseconds. - * - * [heartbeat] job execution is independent of [event] job, however - * only one of them is eligible to submit to Aggregator smart contract - * for a specific round ID. - * - * At first, [heartbeat] worker finds out what round is currently - * accepting submissions given an operator address extracted from - * associated aggregator address. Then, it creates a new job with a - * unique ID denoting the request for report on a specific - * round. Finally, it submits the job to the [aggregator] worker. The - * job ID is created with the same format as in Data Feed Listener - * service, which protects the [aggregator] worker from processing the - * same request twice. - * - * @param {Queue} aggregator queue - * @param {State} ephemeral aggregator state - * @param {Logger} pino logger - * @return {} [heartbeat] job processor - */ -function heartbeatJob(aggregatorQueue: Queue, state: State, _logger: Logger) { - const logger = _logger.child({ name: 'heartbeatJob' }) - - async function wrapper(job: Job) { - const inData: IAggregatorHeartbeatWorker = job.data - const oracleAddress = inData.oracleAddress - - try { - logger.debug({ oracleAddress }) - - // [hearbeat] worker can be controlled by watchman which can - // either activate or deactive a [heartbeat] job. When - // [heartbeat] job cannot be found in a local aggregator state, - // the job is assumed to be terminated, and worker will drop any - // incoming job that should be performed on aggregator denoted - // by `aggregatorAddress`. - if (!state.isActive({ oracleAddress })) { - logger.warn(`Heartbeat job for oracle ${oracleAddress} is no longer active. Exiting.`) - return 0 - } - - const operatorAddress = await getOperatorAddress({ oracleAddress, logger }) - const oracleRoundState = await oracleRoundStateCall({ - oracleAddress, - operatorAddress, - logger, - provider: PROVIDER, - }) - logger.debug(oracleRoundState, 'oracleRoundState') - - const { roundId, eligibleToSubmit } = oracleRoundState - const outData: IDataFeedListenerWorker = { - oracleAddress, - roundId, - workerSource: 'heartbeat', - } - logger.debug(outData, 'outData') - - if (eligibleToSubmit) { - logger.debug({ job: 'added', eligible: true, roundId }, 'before-eligible-fixed') - - const jobId = buildSubmissionRoundJobId({ - oracleAddress, - roundId, - deploymentName: DEPLOYMENT_NAME, - }) - - // [heartbeat] worker is executed at predefined intervals and - // is of vital importance for repeated submission to - // Aggregator smart contract. [heartbeat] worker is not executed - // earlier than N miliseconds (also called as a heartbeat) after - // the latest submission. If the Aggregator smart contract - // tells us that we are eligible to submit to `roundId`, it - // means that reporter has not submitted any value there yet. - // It also means there was no submission in the last N milliseconds. - // If we happen to be at that situation, we assume there is - // a deadlock and the Orakl Network Reporter service failed on - // to submit on particular `roundId`. - await removeAggregatorDeadlock(aggregatorQueue, jobId, logger) - - await aggregatorQueue.add('fixed', outData, { - jobId, - removeOnComplete: REMOVE_ON_COMPLETE, - ...AGGREGATOR_QUEUE_SETTINGS, - }) - logger.debug({ job: 'added', eligible: true, roundId }, 'eligible-fixed') - } else { - const msg = `Non-eligible to submit for oracle ${oracleAddress} with operator ${operatorAddress}` - throw new OraklError(OraklErrorCode.NonEligibleToSubmit, msg) - } - } catch (e) { - const msg = `Heartbeat job for oracle ${oracleAddress} died.` - logger.error(msg) - logger.error(e) - throw e - } - } - - return wrapper -} - -/** - * Reported job might have been requested by [event] worker or - * [deviation] worker before the end of heartbeat delay. If that is - * the case, there is still waiting delayed heartbeat job in the - * heartbeat queue. If that is the case, we remove it. Then, we submit - * the new heartbeat job. - * - * @param {Queue} [heartbeat] queue - * @param {State} ephemeral aggregator state - * @param {Logger} pino logger - * @return {} [submitHeartbeat] job processor - */ -function submitHeartbeatJob(heartbeatQueue: Queue, state: State, _logger: Logger) { - const logger = _logger.child({ name: 'submitHeartbeatJob' }) - - async function wrapper(job: Job) { - const inData: IAggregatorSubmitHeartbeatWorker = job.data - const oracleAddress = inData.oracleAddress - const delay = inData.delay - - const jobId = buildHeartbeatJobId({ oracleAddress, deploymentName: DEPLOYMENT_NAME }) - const allDelayed = (await heartbeatQueue.getJobs(['delayed'])).filter( - (job) => job.opts.jobId == jobId, - ) - - if (!state.isActive({ oracleAddress })) { - logger.warn( - `submitHeartbeatJob for oracle ${oracleAddress} is no longer active. Removing job.`, - ) - return - } - - if (allDelayed.length > 1) { - throw new OraklError( - OraklErrorCode.UnexpectedNumberOfJobsInQueue, - `Number of jobs ${allDelayed.length}`, - ) - } else if (allDelayed.length == 1) { - const delayedJob = allDelayed[0] - await delayedJob.remove() - - logger.debug({ job: 'deleted' }, `Reporter deleted heartbeat job with ID=${jobId}`) - } - - const outData: IAggregatorHeartbeatWorker = { - oracleAddress, - } - await heartbeatQueue.add(HEARTBEAT_JOB_NAME, outData, { - jobId, - delay, - ...HEARTBEAT_QUEUE_SETTINGS, - }) - - await state.updateTimestamp(oracleAddress) - - logger.debug({ job: 'added', delay }, `Reporter submitted heartbeat job with ID=${jobId}`) - } - - return wrapper -} - -/** - * [checkHeartbeat] job is executed in regular intervals to check - * whether any of active heartbeat aggregators died. If any of - * heartbeat jobs has died, resubmit the [heartbeat] job. - * - * @param {Queue} [submitHeartbeat] queue - * @param {State} ephemeral aggregator state - * @param {Logger} pino logger - */ -function checkHeartbeatJob(submitHeartbeatQueue: Queue, state: State, _logger: Logger) { - const logger = _logger.child({ name: 'checkHeartbeatJob' }) - - async function wrapper(_job: Job) { - const activeAggregators = await state.active() - for (const aggregator of activeAggregators) { - const timeBuffer = 2_000 - const heartbeatDeadline = aggregator.timestamp + aggregator.heartbeat + timeBuffer - const isDead = Date.now() > heartbeatDeadline ? true : false - - // Resubmit heartbeat when dead - if (isDead) { - const oracleAddress = aggregator.address - logger.warn(`Aggregator heartbeat job for oracle ${oracleAddress} found dead.`) - const outDataSubmitHeartbeat: IAggregatorSubmitHeartbeatWorker = { - oracleAddress, - delay: aggregator.heartbeat, - } - logger.debug(outDataSubmitHeartbeat, 'outDataSubmitHeartbeat') - await submitHeartbeatQueue.add('checkHeartbeat-submission', outDataSubmitHeartbeat, { - ...SUBMIT_HEARTBEAT_QUEUE_SETTINGS, - }) - logger.info(`Aggregater heartbeat job for oracle ${oracleAddress} resubmitted.`) - } - } - } - - return wrapper -} - -/** - * Remove aggregator deadlock: The job has already been requested and - * accepted from the other end of queue, however, the job might not - * have been accomplished successfully there. The function deletes the - * previously submitted job, so it can be resubmitted again. - * - * Note: This function should be called only when we are certain that - * there is any deadlock. Deadlock detection is not part of this - * function. - * - * @param {queue} aggregator queue - * @param {string} job ID - * @param {Logger} pino logger - * @return {void} - * @except {OraklErrorCode.UnexpectedNumberOfDeadlockJobs} raise when - * more than single deadlock found - */ -async function removeAggregatorDeadlock(aggregatorQueue: Queue, jobId: string, logger: Logger) { - const blockingJob = (await aggregatorQueue.getJobs(['completed'])).filter( - (job) => job.opts.jobId == jobId, - ) - - if (blockingJob.length == 1) { - blockingJob[0].remove() - logger.warn(`Removed blocking job with ID ${jobId}`) - } else if (blockingJob.length > 1) { - throw new OraklError( - OraklErrorCode.UnexpectedNumberOfDeadlockJobs, - `Found ${blockingJob.length} blocking jobs. Expected 1 at most.`, - ) - } -} - -/** - * [deviation] worker receives [fetcher] job - * - * Worker accepts job, parses the request and communicated with Aggregator smart contract to find - * out the which round ID, it can submit the latest value. Then, it - * create a new job and passes it to reporter worker. - * - * @param {QueueType} submit heartbeat queue - * @param {QueueType} reporter queue - * @param {Logger} pino logger - * @return {} [deviation] job processor - */ -export function deviationJob(reporterQueue: QueueType, _logger: Logger) { - const logger = _logger.child({ name: 'deviationJob' }) - const iface = new ethers.utils.Interface(Aggregator__factory.abi) - - async function wrapper(job: Job) { - const inData: IDeviationData = job.data - logger.debug(inData, 'inData') - const { timestamp, submission, oracleAddress } = inData - const operatorAddress = await getOperatorAddress({ oracleAddress, logger }) - const oracleRoundState = await oracleRoundStateCall({ - oracleAddress, - operatorAddress, - logger, - provider: PROVIDER, - }) - logger.debug(oracleRoundState, 'oracleRoundState') - - const { roundId } = oracleRoundState - try { - const { aggregatorHash } = await getAggregatorGivenAddress({ - oracleAddress, - logger, - }) - logger.debug({ aggregatorHash, fetchedAt: timestamp, submission }, 'Latest data aggregate') - if (isStale({ timestamp, logger })) { - logger.warn(`Data became stale (> ${MAX_DATA_STALENESS}). Not reporting.`) - } else { - const tx = buildTransaction({ - payloadParameters: { - roundId, - submission, - }, - to: oracleAddress, - gasMinimum: DATA_FEED_FULFILL_GAS_MINIMUM, - iface, - logger, - }) - logger.debug(tx, 'tx') - - await reporterQueue.add('deviation', tx, { - jobId: buildSubmissionRoundJobId({ - oracleAddress, - roundId, - deploymentName: DEPLOYMENT_NAME, - }), - removeOnComplete: REMOVE_ON_COMPLETE, - removeOnFail: true, - }) - - return tx - } - } catch (e) { - logger.error(e) - throw e - } - } - - return wrapper -} diff --git a/core/src/worker/data-feed.utils.ts b/core/src/worker/data-feed.utils.ts deleted file mode 100644 index 9604a78a8..000000000 --- a/core/src/worker/data-feed.utils.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { Aggregator__factory } from '@bisonai/orakl-contracts/v0.1' -import { ethers } from 'ethers' -import { Logger } from 'pino' -import { MAX_DATA_STALENESS, PROVIDER } from '../settings' -import { IDataFeedTransactionParameters, IOracleRoundState, IRoundData } from '../types' - -/** - * Compute the number of seconds until the next round. - * - * @param {string} oracle address - * @param {number} heartbeat - * @param {Logger} - * @return {number} delay in seconds until the next round - */ -export async function getSynchronizedDelay({ - oracleAddress, - heartbeat, - logger, -}: { - oracleAddress: string - heartbeat: number - logger: Logger -}): Promise { - logger.debug('getSynchronizedDelay') - - const startedAt = await currentRoundStartedAtCall({ - oracleAddress, - logger, - }) - - let delay: number - - if (startedAt != 0) { - const blockTimestamp = (await PROVIDER.getBlock('latest')).timestamp - delay = heartbeat - Math.max(0, (blockTimestamp - startedAt) % heartbeat) - } else { - delay = 0 // The first round -> No need to wait. - } - - logger.debug({ heartbeat, delay, startedAt }) - return delay -} - -async function currentRoundStartedAtCall({ - oracleAddress, - logger, -}: { - oracleAddress: string - logger?: Logger -}) { - logger?.debug({ oracleAddress }, 'currentRoundStartedAtCall') - const aggregator = new ethers.Contract(oracleAddress, Aggregator__factory.abi, PROVIDER) - const startedAt = await aggregator.currentRoundStartedAt() - logger?.debug({ startedAt }, 'startedAt') - return startedAt -} - -export async function oracleRoundStateCall({ - oracleAddress, - operatorAddress, - logger, - roundId, - provider, -}: { - oracleAddress: string - operatorAddress: string - roundId?: number - logger?: Logger - provider: ethers.providers.JsonRpcProvider -}): Promise { - logger?.debug({ oracleAddress, operatorAddress }, 'oracleRoundStateCall') - const aggregator = new ethers.Contract(oracleAddress, Aggregator__factory.abi, provider) - - let queriedRoundId = 0 - if (roundId) { - queriedRoundId = roundId - } - - const state = await aggregator.oracleRoundState(operatorAddress, queriedRoundId) - return { - eligibleToSubmit: state._eligibleToSubmit, - roundId: state._roundId, - latestSubmission: state._latestSubmission, - startedAt: state._startedAt, - timeout: state._timeout, - availableFunds: state._availableFunds, - oracleCount: state._oracleCount, - paymentAmount: state._paymentAmount, - } -} - -export async function getRoundDataCall({ - oracleAddress, - roundId, -}: { - oracleAddress: string - roundId: number -}): Promise { - const aggregator = new ethers.Contract(oracleAddress, Aggregator__factory.abi, PROVIDER) - return await aggregator.getRoundData(roundId) -} - -export function isStale({ timestamp, logger }: { timestamp: string; logger: Logger }) { - const now = Date.now() - const fetchedAt = Date.parse(timestamp) - const dataStaleness = Math.max(0, now - fetchedAt) - logger.debug(`Data staleness ${dataStaleness} ms`) - return dataStaleness > MAX_DATA_STALENESS -} - -export function buildTransaction({ - payloadParameters, - to, - gasMinimum, - iface, - logger, -}: { - payloadParameters: IDataFeedTransactionParameters - to: string - gasMinimum: number - iface: ethers.utils.Interface - logger: Logger -}) { - const { roundId, submission } = payloadParameters - const payload = iface.encodeFunctionData('submit', [roundId, submission]) - const gasLimit = gasMinimum - const tx = { - payload, - gasLimit, - to, - } - logger.debug(tx) - return tx -} diff --git a/core/test/data-feed-worker.test.ts b/core/test/data-feed-worker.test.ts deleted file mode 100644 index 624830051..000000000 --- a/core/test/data-feed-worker.test.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Aggregator__factory } from '@bisonai/orakl-contracts/v0.1' -import { ethers } from 'ethers' -import { buildMockLogger } from '../src/logger' -import { DATA_FEED_FULFILL_GAS_MINIMUM } from '../src/settings' -import { buildTransaction } from '../src/worker/data-feed.utils' - -describe('Data Feed Worker', function () { - it('Data Feed Build Transaction', async function () { - const logger = buildMockLogger() - const oracleAddress = '0xccf9a654c878848991e46ab23d2ad055ca827979' // random address - const iface = new ethers.utils.Interface(Aggregator__factory.abi) - - const tx = buildTransaction({ - payloadParameters: { - roundId: 10, - submission: BigInt(123), - }, - to: oracleAddress, - gasMinimum: DATA_FEED_FULFILL_GAS_MINIMUM, - iface, - logger, - }) - - expect(tx?.payload).toBe( - '0x202ee0ed000000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000007b', - ) - expect(tx?.gasLimit).toBe(DATA_FEED_FULFILL_GAS_MINIMUM) - expect(tx?.to).toBe(oracleAddress) - }) -})