Skip to content

Commit

Permalink
merge processQ into latest and history Qs
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed May 21, 2024
1 parent 832aeb7 commit 830b489
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 76 deletions.
3 changes: 0 additions & 3 deletions core/src/listener/data-feed-L2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
L2_DATA_FEED_SERVICE_NAME,
L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME,
L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME,
L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME,
L2_WORKER_AGGREGATOR_QUEUE_NAME
} from '../settings'
import { IAnswerUpdated, IDataFeedListenerWorkerL2, IListenerConfig } from '../types'
Expand All @@ -30,7 +29,6 @@ export async function buildListener(
const eventName = 'AnswerUpdated'
const latestQueueName = L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME
const historyQueueName = L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME
const processEventQueueName = L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = L2_WORKER_AGGREGATOR_QUEUE_NAME
const abi = Aggregator__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -44,7 +42,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
1 change: 0 additions & 1 deletion core/src/listener/data-feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
113 changes: 60 additions & 53 deletions core/src/listener/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Logger } from 'pino'
import type { RedisClientType } from 'redis'
import { BULLMQ_CONNECTION, getObservedBlockRedisKey, LISTENER_JOB_SETTINGS } from '../settings'
import { IListenerConfig } from '../types'
import { getListenerObservedBlock } from './api'
import { upsertListenerObservedBlock } from './api'
import { State } from './state'
import {
IHistoryListenerJob,
Expand Down Expand Up @@ -51,7 +51,6 @@ export async function listenerService({
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn,
redisClient,
Expand All @@ -65,15 +64,13 @@ export async function listenerService({
eventName: string
latestQueueName: string
historyQueueName: string
processEventQueueName: string
workerQueueName: string
processFn: (log: ethers.Event) => Promise<ProcessEventOutputType | undefined>
redisClient: RedisClientType
logger: Logger
}) {
const latestListenerQueue = new Queue(latestQueueName, BULLMQ_CONNECTION)
const historyListenerQueue = new Queue(historyQueueName, BULLMQ_CONNECTION)
const processEventQueue = new Queue(processEventQueueName, BULLMQ_CONNECTION)
const workerQueue = new Queue(workerQueueName, BULLMQ_CONNECTION)

const state = new State({
Expand All @@ -94,8 +91,9 @@ export async function listenerService({
latestJob({
state,
historyListenerQueue,
processEventQueue,
workerQueue,
redisClient,
processFn,
logger
}),
BULLMQ_CONNECTION
Expand All @@ -106,22 +104,13 @@ export async function listenerService({

const historyWorker = new Worker(
historyQueueName,
historyJob({ state, processEventQueue, logger }),
historyJob({ state, logger, processFn, redisClient, workerQueue }),
BULLMQ_CONNECTION
)
historyWorker.on('error', (e) => {
logger.error(e)
})

const processEventWorker = new Worker(
processEventQueueName,
processEventJob({ workerQueue, processFn, logger }),
BULLMQ_CONNECTION
)
processEventWorker.on('error', (e) => {
logger.error(e)
})

for (const listener of config) {
await state.add(listener.id)
}
Expand All @@ -133,9 +122,8 @@ export async function listenerService({

await latestWorker.close()
await historyWorker.close()
await processEventWorker.close()
await state.clear()
await watchmanServer.close()
watchmanServer.close()
await redisClient.quit()
}
process.on('SIGINT', handleExit)
Expand Down Expand Up @@ -166,15 +154,17 @@ export async function listenerService({
*/
function latestJob({
state,
processEventQueue,
workerQueue,
historyListenerQueue,
redisClient,
processFn,
logger
}: {
state: State
processEventQueue: Queue
historyListenerQueue: Queue
redisClient: RedisClientType
workerQueue: Queue
processFn: (log: ethers.Event) => Promise<ProcessEventOutputType | undefined>
logger: Logger
}) {
async function wrapper(job: Job) {
Expand All @@ -200,12 +190,7 @@ function latestJob({
try {
// We assume that observedBlock has been properly set in the db within
// `State.add` method call.
observedBlock = (
await getListenerObservedBlock({
blockKey: observedBlockRedisKey,
logger: this.logger
})
)['blockNumber']
observedBlock = Number(await redisClient.get(observedBlockRedisKey))
} catch (e) {
// Similarly to the failure during fetching the latest block
// number, this error doesn't require job resubmission. The next
Expand All @@ -219,22 +204,31 @@ function latestJob({
const logPrefix = generateListenerLogPrefix(contractAddress, observedBlock, latestBlock)
try {
if (latestBlock > observedBlock) {
await redisClient.set(observedBlockRedisKey, latestBlock)

const lockObservedBlock = observedBlock + 1
// The `observedBlock` block number is already processed,
// therefore we do not need to re-query the same event in such
// block again.
const events = await state.queryEvent(contractAddress, observedBlock + 1, latestBlock)
for (const [index, event] of events.entries()) {
const outData: IProcessEventListenerJob = {
contractAddress,
event
for (let blockNumber = lockObservedBlock; blockNumber <= latestBlock; ++blockNumber) {
const events = await state.queryEvent(contractAddress, blockNumber, blockNumber)
for (const [_, event] of events.entries()) {
const jobMetadata = await processFn(event)
if (jobMetadata) {
const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata
const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS
await workerQueue.add(jobName, jobData, {
jobId,
...queueSettings
})
logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`)
}
}
const jobId = getUniqueEventIdentifier(event, index)
await processEventQueue.add('latest', outData, {
jobId,
...LISTENER_JOB_SETTINGS
await upsertListenerObservedBlock({
blockKey: observedBlockRedisKey,
blockNumber,
logger: this.logger
})
await redisClient.set(observedBlockRedisKey, blockNumber)
observedBlock += 1 // in case of failure, dont add processed blocks to history queue
}
logger.debug(logPrefix)
} else {
Expand Down Expand Up @@ -277,12 +271,16 @@ function latestJob({
*/
function historyJob({
state,
processEventQueue,
logger
logger,
processFn,
redisClient,
workerQueue
}: {
state: State
processEventQueue: Queue
logger: Logger
redisClient: RedisClientType
workerQueue: Queue
processFn: (log: ethers.Event) => Promise<ProcessEventOutputType | undefined>
}) {
async function wrapper(job: Job) {
const inData: IHistoryListenerJob = job.data
Expand All @@ -291,25 +289,34 @@ function historyJob({

let events: ethers.Event[] = []
try {
const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress)
const observedBlock = Number(await redisClient.get(observedBlockRedisKey))
events = await state.queryEvent(contractAddress, blockNumber, blockNumber)
for (const [_, event] of events.entries()) {
const jobMetadata = await processFn(event)
if (jobMetadata) {
const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata
const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS
await workerQueue.add(jobName, jobData, {
jobId,
...queueSettings
})
logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`)
}
}

if (blockNumber > observedBlock) {
await upsertListenerObservedBlock({
blockKey: observedBlockRedisKey,
blockNumber,
logger
})
await redisClient.set(observedBlockRedisKey, blockNumber)
}
} catch (e) {
logger.error(`${logPrefix} hist fail`)
throw e
}

logger.debug(`${logPrefix} hist`)

for (const [index, event] of events.entries()) {
const outData: IProcessEventListenerJob = {
contractAddress,
event
}
const jobId = getUniqueEventIdentifier(event, index)
await processEventQueue.add('history', outData, {
jobId,
...LISTENER_JOB_SETTINGS
})
}
}

return wrapper
Expand Down
3 changes: 0 additions & 3 deletions core/src/listener/request-response-L2-fulfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
L1_ENDPOINT,
L2_LISTENER_REQUEST_RESPONSE_FULFILL_HISTORY_QUEUE_NAME,
L2_LISTENER_REQUEST_RESPONSE_FULFILL_LATEST_QUEUE_NAME,
L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME,
L2_REQUEST_RESPONSE_FULFILL_LISTENER_STATE_NAME,
L2_REQUEST_RESPONSE_FULFILL_SERVICE_NAME,
L2_WORKER_REQUEST_RESPONSE_FULFILL_QUEUE_NAME
Expand All @@ -34,7 +33,6 @@ export async function buildListener(
const eventName = 'DataRequestFulfilled'
const latestQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_LATEST_QUEUE_NAME
const historyQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_HISTORY_QUEUE_NAME
const processEventQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = L2_WORKER_REQUEST_RESPONSE_FULFILL_QUEUE_NAME
const abi = L1Endpoint__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -48,7 +46,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
3 changes: 0 additions & 3 deletions core/src/listener/request-response-L2-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
L1_ENDPOINT,
L2_LISTENER_REQUEST_RESPONSE_REQUEST_HISTORY_QUEUE_NAME,
L2_LISTENER_REQUEST_RESPONSE_REQUEST_LATEST_QUEUE_NAME,
L2_LISTENER_REQUEST_RESPONSE_REQUEST_PROCESS_EVENT_QUEUE_NAME,
L2_REQUEST_RESPONSE_REQUEST_LISTENER_STATE_NAME,
L2_REQUEST_RESPONSE_REQUEST_SERVICE_NAME,
L2_WORKER_REQUEST_RESPONSE_REQUEST_QUEUE_NAME
Expand All @@ -29,7 +28,6 @@ export async function buildListener(
const eventName = 'DataRequested'
const latestQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_LATEST_QUEUE_NAME
const historyQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_HISTORY_QUEUE_NAME
const processEventQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = L2_WORKER_REQUEST_RESPONSE_REQUEST_QUEUE_NAME
const abi = L2Endpoint__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -43,7 +41,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
3 changes: 0 additions & 3 deletions core/src/listener/request-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
CHAIN,
LISTENER_REQUEST_RESPONSE_HISTORY_QUEUE_NAME,
LISTENER_REQUEST_RESPONSE_LATEST_QUEUE_NAME,
LISTENER_REQUEST_RESPONSE_PROCESS_EVENT_QUEUE_NAME,
REQUEST_RESPONSE_LISTENER_STATE_NAME,
REQUEST_RESPONSE_SERVICE_NAME,
WORKER_REQUEST_RESPONSE_QUEUE_NAME
Expand All @@ -28,7 +27,6 @@ export async function buildListener(
const eventName = 'DataRequested'
const latestQueueName = LISTENER_REQUEST_RESPONSE_LATEST_QUEUE_NAME
const historyQueueName = LISTENER_REQUEST_RESPONSE_HISTORY_QUEUE_NAME
const processEventQueueName = LISTENER_REQUEST_RESPONSE_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = WORKER_REQUEST_RESPONSE_QUEUE_NAME
const abi = RequestResponseCoordinator__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -42,7 +40,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
3 changes: 2 additions & 1 deletion core/src/listener/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ export class State {
update observedBlock to latestBlock - 1 since all blocks in between will be handled
by history queue and worker
*/
await upsertListenerObservedBlock({
upsertListenerObservedBlock({
blockKey: observedBlockRedisKey,
blockNumber: Math.max(latestBlock - 1, 0),
logger: this.logger
})
this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0))

for (let blockNumber = observedBlock; blockNumber < latestBlock; ++blockNumber) {
const historyOutData: IHistoryListenerJob = {
Expand Down
3 changes: 0 additions & 3 deletions core/src/listener/vrf-L2-fulfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
L2_ENDPOINT,
L2_LISTENER_VRF_FULFILL_HISTORY_QUEUE_NAME,
L2_LISTENER_VRF_FULFILL_LATEST_QUEUE_NAME,
L2_LISTENER_VRF_FULFILL_PROCESS_EVENT_QUEUE_NAME,
L2_VRF_FULFILL_LISTENER_STATE_NAME,
L2_VRF_FULFILL_SERVICE_NAME,
L2_WORKER_VRF_FULFILL_QUEUE_NAME
Expand All @@ -29,7 +28,6 @@ export async function buildListener(
const eventName = 'RandomWordFulfilled'
const latestQueueName = L2_LISTENER_VRF_FULFILL_LATEST_QUEUE_NAME
const historyQueueName = L2_LISTENER_VRF_FULFILL_HISTORY_QUEUE_NAME
const processEventQueueName = L2_LISTENER_VRF_FULFILL_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = L2_WORKER_VRF_FULFILL_QUEUE_NAME
const abi = L1Endpoint__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -43,7 +41,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
3 changes: 0 additions & 3 deletions core/src/listener/vrf-L2-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
L1_ENDPOINT,
L2_LISTENER_VRF_REQUEST_HISTORY_QUEUE_NAME,
L2_LISTENER_VRF_REQUEST_LATEST_QUEUE_NAME,
L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME,
L2_VRF_REQUEST_LISTENER_STATE_NAME,
L2_VRF_REQUEST_SERVICE_NAME,
L2_WORKER_VRF_REQUEST_QUEUE_NAME
Expand All @@ -30,7 +29,6 @@ export async function buildListener(
const eventName = 'RandomWordsRequested'
const latestQueueName = L2_LISTENER_VRF_REQUEST_LATEST_QUEUE_NAME
const historyQueueName = L2_LISTENER_VRF_REQUEST_HISTORY_QUEUE_NAME
const processEventQueueName = L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = L2_WORKER_VRF_REQUEST_QUEUE_NAME
const abi = L2Endpoint__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -44,7 +42,6 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
Expand Down
Loading

0 comments on commit 830b489

Please sign in to comment.