Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed May 22, 2024
1 parent f82a8f8 commit 63eb1c3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 67 deletions.
73 changes: 8 additions & 65 deletions core/src/listener/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,9 @@ import { BULLMQ_CONNECTION, getObservedBlockRedisKey, LISTENER_JOB_SETTINGS } fr
import { IListenerConfig } from '../types'
import { upsertListenerObservedBlock } from './api'
import { State } from './state'
import {
IHistoryListenerJob,
ILatestListenerJob,
IProcessEventListenerJob,
ProcessEventOutputType
} from './types'
import { IHistoryListenerJob, ILatestListenerJob, ProcessEventOutputType } from './types'
import { watchman } from './watchman'

const FILE_NAME = import.meta.url

/**
* The listener service is used for tracking events emmitted by smart
* contracts. Tracked events are subsequently send to BullMQ queue to
Expand Down Expand Up @@ -188,7 +181,7 @@ function latestJob({
}

try {
// We assume that observedBlock has been properly set in the db within
// We assume that redis cache has been initialized within
// `State.add` method call.
observedBlock = Number(await redisClient.get(observedBlockRedisKey))
} catch (e) {
Expand All @@ -201,6 +194,11 @@ function latestJob({
throw e
}

if (latestBlock < observedBlock) {
logger.warn('latestBlock < observedBlock. Updating observed block to revert the condition.')
observedBlock = Math.max(0, latestBlock - 1)
}

const logPrefix = generateListenerLogPrefix(contractAddress, observedBlock, latestBlock)
try {
if (latestBlock > observedBlock) {
Expand Down Expand Up @@ -287,11 +285,10 @@ function historyJob({
const { contractAddress, blockNumber } = inData
const logPrefix = generateListenerLogPrefix(contractAddress, blockNumber, blockNumber)

let events: ethers.Event[] = []
try {
const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress)
const observedBlock = Number(await redisClient.get(observedBlockRedisKey))
events = await state.queryEvent(contractAddress, blockNumber, blockNumber)
const events = await state.queryEvent(contractAddress, blockNumber, blockNumber)
for (const [_, event] of events.entries()) {
const jobMetadata = await processFn(event)
if (jobMetadata) {
Expand Down Expand Up @@ -322,60 +319,6 @@ function historyJob({
return wrapper
}

/**
* The [processEvent] listener worker accepts jobs from [processEvent]
* queue. The jobs are submitted either by the [latest] or [history]
* listener worker.
*
* @param {(log: ethers.Event) => Promise<ProcessEventOutputType | undefined>} function that processes event caught by listener
*/
function processEventJob({
workerQueue,
processFn,
logger
}: {
workerQueue: Queue
processFn: (log: ethers.Event) => Promise<ProcessEventOutputType | undefined>
logger: Logger
}) {
const _logger = logger.child({ name: 'processEventJob', file: FILE_NAME })

async function wrapper(job: Job) {
const inData: IProcessEventListenerJob = job.data
const { event } = inData
_logger.debug(event, 'event')

try {
const jobMetadata = await processFn(event)
if (jobMetadata) {
const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata
const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS
await workerQueue.add(jobName, jobData, {
jobId,
...queueSettings
})
_logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`)
}
} catch (e) {
_logger.error(e, 'Error in user defined listener processing function')
throw e
}
}

return wrapper
}

/**
* Auxiliary function to create a unique identifier for a give `event`
* and `index` of the even within the transaction.
*
* @param {ethers.Event} event
* @param {number} index of event within a transaction
*/
function getUniqueEventIdentifier(event: ethers.Event, index: number) {
return `${event.blockNumber}-${event.transactionHash}-${index}`
}

/**
* Auxiliary function that generate a consisten log prefix, that is
* used both by the [latest] and [history] listener worker.
Expand Down
4 changes: 2 additions & 2 deletions core/src/listener/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ export class State {
update observedBlock to latestBlock - 1 since all blocks in between will be handled
by history queue and worker
*/
upsertListenerObservedBlock({
await upsertListenerObservedBlock({
blockKey: observedBlockRedisKey,
blockNumber: Math.max(latestBlock - 1, 0),
logger: this.logger
})
this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0))
await this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0))

for (let blockNumber = observedBlock; blockNumber < latestBlock; ++blockNumber) {
const historyOutData: IHistoryListenerJob = {
Expand Down

0 comments on commit 63eb1c3

Please sign in to comment.