diff --git a/core/src/listener/data-feed-L2.ts b/core/src/listener/data-feed-L2.ts index d252e9156..92d2543aa 100644 --- a/core/src/listener/data-feed-L2.ts +++ b/core/src/listener/data-feed-L2.ts @@ -10,7 +10,6 @@ import { L2_DATA_FEED_SERVICE_NAME, L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME, L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME, - L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME, L2_WORKER_AGGREGATOR_QUEUE_NAME } from '../settings' import { IAnswerUpdated, IDataFeedListenerWorkerL2, IListenerConfig } from '../types' @@ -30,7 +29,6 @@ export async function buildListener( const eventName = 'AnswerUpdated' const latestQueueName = L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_AGGREGATOR_QUEUE_NAME const abi = Aggregator__factory.abi const iface = new ethers.utils.Interface(abi) @@ -44,7 +42,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/data-feed.ts b/core/src/listener/data-feed.ts index 18af3fee4..7026acb75 100644 --- a/core/src/listener/data-feed.ts +++ b/core/src/listener/data-feed.ts @@ -57,7 +57,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/listener.ts b/core/src/listener/listener.ts index 0727faf74..e811977d4 100644 --- a/core/src/listener/listener.ts +++ b/core/src/listener/listener.ts @@ -4,7 +4,7 @@ import { Logger } from 'pino' import type { RedisClientType } from 'redis' import { BULLMQ_CONNECTION, getObservedBlockRedisKey, LISTENER_JOB_SETTINGS } from '../settings' import { IListenerConfig } from '../types' -import { getListenerObservedBlock } from './api' +import { upsertListenerObservedBlock } from './api' import { State } from './state' import { IHistoryListenerJob, @@ -51,7 +51,6 @@ export async function listenerService({ eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn, redisClient, @@ -65,7 +64,6 @@ export async function listenerService({ eventName: string latestQueueName: string historyQueueName: string - processEventQueueName: string workerQueueName: string processFn: (log: ethers.Event) => Promise redisClient: RedisClientType @@ -73,7 +71,6 @@ export async function listenerService({ }) { const latestListenerQueue = new Queue(latestQueueName, BULLMQ_CONNECTION) const historyListenerQueue = new Queue(historyQueueName, BULLMQ_CONNECTION) - const processEventQueue = new Queue(processEventQueueName, BULLMQ_CONNECTION) const workerQueue = new Queue(workerQueueName, BULLMQ_CONNECTION) const state = new State({ @@ -94,8 +91,9 @@ export async function listenerService({ latestJob({ state, historyListenerQueue, - processEventQueue, + workerQueue, redisClient, + processFn, logger }), BULLMQ_CONNECTION @@ -106,22 +104,13 @@ export async function listenerService({ const historyWorker = new Worker( historyQueueName, - historyJob({ state, processEventQueue, logger }), + historyJob({ state, logger, processFn, redisClient, workerQueue }), BULLMQ_CONNECTION ) historyWorker.on('error', (e) => { logger.error(e) }) - const processEventWorker = new Worker( - processEventQueueName, - processEventJob({ workerQueue, processFn, logger }), - BULLMQ_CONNECTION - ) - processEventWorker.on('error', (e) => { - logger.error(e) - }) - for (const listener of config) { await state.add(listener.id) } @@ -133,9 +122,8 @@ export async function listenerService({ await latestWorker.close() await historyWorker.close() - await processEventWorker.close() await state.clear() - await watchmanServer.close() + watchmanServer.close() await redisClient.quit() } process.on('SIGINT', handleExit) @@ -166,15 +154,17 @@ export async function listenerService({ */ function latestJob({ state, - processEventQueue, + workerQueue, historyListenerQueue, redisClient, + processFn, logger }: { state: State - processEventQueue: Queue historyListenerQueue: Queue redisClient: RedisClientType + workerQueue: Queue + processFn: (log: ethers.Event) => Promise logger: Logger }) { async function wrapper(job: Job) { @@ -200,12 +190,7 @@ function latestJob({ try { // We assume that observedBlock has been properly set in the db within // `State.add` method call. - observedBlock = ( - await getListenerObservedBlock({ - blockKey: observedBlockRedisKey, - logger: this.logger - }) - )['blockNumber'] + observedBlock = Number(await redisClient.get(observedBlockRedisKey)) } catch (e) { // Similarly to the failure during fetching the latest block // number, this error doesn't require job resubmission. The next @@ -219,22 +204,31 @@ function latestJob({ const logPrefix = generateListenerLogPrefix(contractAddress, observedBlock, latestBlock) try { if (latestBlock > observedBlock) { - await redisClient.set(observedBlockRedisKey, latestBlock) - + const lockObservedBlock = observedBlock + 1 // The `observedBlock` block number is already processed, // therefore we do not need to re-query the same event in such // block again. - const events = await state.queryEvent(contractAddress, observedBlock + 1, latestBlock) - for (const [index, event] of events.entries()) { - const outData: IProcessEventListenerJob = { - contractAddress, - event + for (let blockNumber = lockObservedBlock; blockNumber <= latestBlock; ++blockNumber) { + const events = await state.queryEvent(contractAddress, blockNumber, blockNumber) + for (const [_, event] of events.entries()) { + const jobMetadata = await processFn(event) + if (jobMetadata) { + const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata + const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS + await workerQueue.add(jobName, jobData, { + jobId, + ...queueSettings + }) + logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`) + } } - const jobId = getUniqueEventIdentifier(event, index) - await processEventQueue.add('latest', outData, { - jobId, - ...LISTENER_JOB_SETTINGS + await upsertListenerObservedBlock({ + blockKey: observedBlockRedisKey, + blockNumber, + logger: this.logger }) + await redisClient.set(observedBlockRedisKey, blockNumber) + observedBlock += 1 // in case of failure, dont add processed blocks to history queue } logger.debug(logPrefix) } else { @@ -277,12 +271,16 @@ function latestJob({ */ function historyJob({ state, - processEventQueue, - logger + logger, + processFn, + redisClient, + workerQueue }: { state: State - processEventQueue: Queue logger: Logger + redisClient: RedisClientType + workerQueue: Queue + processFn: (log: ethers.Event) => Promise }) { async function wrapper(job: Job) { const inData: IHistoryListenerJob = job.data @@ -291,25 +289,34 @@ function historyJob({ let events: ethers.Event[] = [] try { + const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress) + const observedBlock = Number(await redisClient.get(observedBlockRedisKey)) events = await state.queryEvent(contractAddress, blockNumber, blockNumber) + for (const [_, event] of events.entries()) { + const jobMetadata = await processFn(event) + if (jobMetadata) { + const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata + const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS + await workerQueue.add(jobName, jobData, { + jobId, + ...queueSettings + }) + logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`) + } + } + + if (blockNumber > observedBlock) { + await upsertListenerObservedBlock({ + blockKey: observedBlockRedisKey, + blockNumber, + logger + }) + await redisClient.set(observedBlockRedisKey, blockNumber) + } } catch (e) { logger.error(`${logPrefix} hist fail`) throw e } - - logger.debug(`${logPrefix} hist`) - - for (const [index, event] of events.entries()) { - const outData: IProcessEventListenerJob = { - contractAddress, - event - } - const jobId = getUniqueEventIdentifier(event, index) - await processEventQueue.add('history', outData, { - jobId, - ...LISTENER_JOB_SETTINGS - }) - } } return wrapper diff --git a/core/src/listener/request-response-L2-fulfill.ts b/core/src/listener/request-response-L2-fulfill.ts index 45eee7f30..ea998e2c0 100644 --- a/core/src/listener/request-response-L2-fulfill.ts +++ b/core/src/listener/request-response-L2-fulfill.ts @@ -7,7 +7,6 @@ import { L1_ENDPOINT, L2_LISTENER_REQUEST_RESPONSE_FULFILL_HISTORY_QUEUE_NAME, L2_LISTENER_REQUEST_RESPONSE_FULFILL_LATEST_QUEUE_NAME, - L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME, L2_REQUEST_RESPONSE_FULFILL_LISTENER_STATE_NAME, L2_REQUEST_RESPONSE_FULFILL_SERVICE_NAME, L2_WORKER_REQUEST_RESPONSE_FULFILL_QUEUE_NAME @@ -34,7 +33,6 @@ export async function buildListener( const eventName = 'DataRequestFulfilled' const latestQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_REQUEST_RESPONSE_FULFILL_QUEUE_NAME const abi = L1Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -48,7 +46,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/request-response-L2-request.ts b/core/src/listener/request-response-L2-request.ts index 788fd5853..2aa47b0df 100644 --- a/core/src/listener/request-response-L2-request.ts +++ b/core/src/listener/request-response-L2-request.ts @@ -7,7 +7,6 @@ import { L1_ENDPOINT, L2_LISTENER_REQUEST_RESPONSE_REQUEST_HISTORY_QUEUE_NAME, L2_LISTENER_REQUEST_RESPONSE_REQUEST_LATEST_QUEUE_NAME, - L2_LISTENER_REQUEST_RESPONSE_REQUEST_PROCESS_EVENT_QUEUE_NAME, L2_REQUEST_RESPONSE_REQUEST_LISTENER_STATE_NAME, L2_REQUEST_RESPONSE_REQUEST_SERVICE_NAME, L2_WORKER_REQUEST_RESPONSE_REQUEST_QUEUE_NAME @@ -29,7 +28,6 @@ export async function buildListener( const eventName = 'DataRequested' const latestQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_REQUEST_RESPONSE_REQUEST_QUEUE_NAME const abi = L2Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -43,7 +41,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/request-response.ts b/core/src/listener/request-response.ts index ea3202136..fbb12811d 100644 --- a/core/src/listener/request-response.ts +++ b/core/src/listener/request-response.ts @@ -6,7 +6,6 @@ import { CHAIN, LISTENER_REQUEST_RESPONSE_HISTORY_QUEUE_NAME, LISTENER_REQUEST_RESPONSE_LATEST_QUEUE_NAME, - LISTENER_REQUEST_RESPONSE_PROCESS_EVENT_QUEUE_NAME, REQUEST_RESPONSE_LISTENER_STATE_NAME, REQUEST_RESPONSE_SERVICE_NAME, WORKER_REQUEST_RESPONSE_QUEUE_NAME @@ -28,7 +27,6 @@ export async function buildListener( const eventName = 'DataRequested' const latestQueueName = LISTENER_REQUEST_RESPONSE_LATEST_QUEUE_NAME const historyQueueName = LISTENER_REQUEST_RESPONSE_HISTORY_QUEUE_NAME - const processEventQueueName = LISTENER_REQUEST_RESPONSE_PROCESS_EVENT_QUEUE_NAME const workerQueueName = WORKER_REQUEST_RESPONSE_QUEUE_NAME const abi = RequestResponseCoordinator__factory.abi const iface = new ethers.utils.Interface(abi) @@ -42,7 +40,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/state.ts b/core/src/listener/state.ts index 3906c2a81..c4e7dca75 100644 --- a/core/src/listener/state.ts +++ b/core/src/listener/state.ts @@ -186,11 +186,12 @@ export class State { update observedBlock to latestBlock - 1 since all blocks in between will be handled by history queue and worker */ - await upsertListenerObservedBlock({ + upsertListenerObservedBlock({ blockKey: observedBlockRedisKey, blockNumber: Math.max(latestBlock - 1, 0), logger: this.logger }) + this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0)) for (let blockNumber = observedBlock; blockNumber < latestBlock; ++blockNumber) { const historyOutData: IHistoryListenerJob = { diff --git a/core/src/listener/vrf-L2-fulfill.ts b/core/src/listener/vrf-L2-fulfill.ts index a76ab0ea6..af5fb9af5 100644 --- a/core/src/listener/vrf-L2-fulfill.ts +++ b/core/src/listener/vrf-L2-fulfill.ts @@ -7,7 +7,6 @@ import { L2_ENDPOINT, L2_LISTENER_VRF_FULFILL_HISTORY_QUEUE_NAME, L2_LISTENER_VRF_FULFILL_LATEST_QUEUE_NAME, - L2_LISTENER_VRF_FULFILL_PROCESS_EVENT_QUEUE_NAME, L2_VRF_FULFILL_LISTENER_STATE_NAME, L2_VRF_FULFILL_SERVICE_NAME, L2_WORKER_VRF_FULFILL_QUEUE_NAME @@ -29,7 +28,6 @@ export async function buildListener( const eventName = 'RandomWordFulfilled' const latestQueueName = L2_LISTENER_VRF_FULFILL_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_VRF_FULFILL_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_VRF_FULFILL_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_VRF_FULFILL_QUEUE_NAME const abi = L1Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -43,7 +41,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/vrf-L2-request.ts b/core/src/listener/vrf-L2-request.ts index ef7b6982f..d020efcb7 100644 --- a/core/src/listener/vrf-L2-request.ts +++ b/core/src/listener/vrf-L2-request.ts @@ -8,7 +8,6 @@ import { L1_ENDPOINT, L2_LISTENER_VRF_REQUEST_HISTORY_QUEUE_NAME, L2_LISTENER_VRF_REQUEST_LATEST_QUEUE_NAME, - L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME, L2_VRF_REQUEST_LISTENER_STATE_NAME, L2_VRF_REQUEST_SERVICE_NAME, L2_WORKER_VRF_REQUEST_QUEUE_NAME @@ -30,7 +29,6 @@ export async function buildListener( const eventName = 'RandomWordsRequested' const latestQueueName = L2_LISTENER_VRF_REQUEST_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_VRF_REQUEST_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_VRF_REQUEST_QUEUE_NAME const abi = L2Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -44,7 +42,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, diff --git a/core/src/listener/vrf.ts b/core/src/listener/vrf.ts index a525354d0..4cd3c1e37 100644 --- a/core/src/listener/vrf.ts +++ b/core/src/listener/vrf.ts @@ -7,7 +7,6 @@ import { CHAIN, LISTENER_VRF_HISTORY_QUEUE_NAME, LISTENER_VRF_LATEST_QUEUE_NAME, - LISTENER_VRF_PROCESS_EVENT_QUEUE_NAME, VRF_LISTENER_STATE_NAME, VRF_SERVICE_NAME, WORKER_VRF_QUEUE_NAME @@ -29,7 +28,6 @@ export async function buildListener( const eventName = 'RandomWordsRequested' const latestQueueName = LISTENER_VRF_LATEST_QUEUE_NAME const historyQueueName = LISTENER_VRF_HISTORY_QUEUE_NAME - const processEventQueueName = LISTENER_VRF_PROCESS_EVENT_QUEUE_NAME const workerQueueName = WORKER_VRF_QUEUE_NAME const abi = VRFCoordinator__factory.abi const iface = new ethers.utils.Interface(abi) @@ -43,7 +41,6 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient,