Skip to content

Commit

Permalink
pass all unprocessed blocks to latestQ when listener starts
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed May 22, 2024
1 parent 63eb1c3 commit 5a70839
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
17 changes: 11 additions & 6 deletions core/src/listener/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ function latestJob({

try {
// We assume that redis cache has been initialized within
// `State.add` method call.
// `State.add` method call and observedBlock has already been processed
observedBlock = Number(await redisClient.get(observedBlockRedisKey))
} catch (e) {
// Similarly to the failure during fetching the latest block
Expand All @@ -199,13 +199,12 @@ function latestJob({
observedBlock = Math.max(0, latestBlock - 1)
}

const logPrefix = generateListenerLogPrefix(contractAddress, observedBlock, latestBlock)
try {
if (latestBlock > observedBlock) {
const lockObservedBlock = observedBlock + 1
// The `observedBlock` block number is already processed,
// therefore we do not need to re-query the same event in such
// block again.
const lockObservedBlock = observedBlock + 1
for (let blockNumber = lockObservedBlock; blockNumber <= latestBlock; ++blockNumber) {
const events = await state.queryEvent(contractAddress, blockNumber, blockNumber)
for (const [_, event] of events.entries()) {
Expand All @@ -228,16 +227,22 @@ function latestJob({
await redisClient.set(observedBlockRedisKey, blockNumber)
observedBlock += 1 // in case of failure, dont add processed blocks to history queue
}
logger.debug(logPrefix)
logger.debug(
`${generateListenerLogPrefix(contractAddress, lockObservedBlock, observedBlock)} success`
)
} else {
logger.debug(`${logPrefix} noop`)
logger.debug(
`${generateListenerLogPrefix(contractAddress, observedBlock, latestBlock)} noop`
)
}
} catch (e) {
// Querying the latest events or passing data to [process] queue
// failed. Repeateable [latest] job will continue listening for
// new blocks, and the blocks which failed to be scanned for
// events will be retried through [history] job.
logger.warn(`${logPrefix} fail`)
logger.warn(
`${generateListenerLogPrefix(contractAddress, observedBlock + 1, latestBlock)} fail`
)

for (let blockNumber = observedBlock + 1; blockNumber <= latestBlock; ++blockNumber) {
const outData: IHistoryListenerJob = { contractAddress, blockNumber }
Expand Down
37 changes: 15 additions & 22 deletions core/src/listener/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from '../settings'
import { IListenerConfig, IListenerRawConfig } from '../types'
import { getListenerObservedBlock, getListeners, upsertListenerObservedBlock } from './api'
import { IContracts, IHistoryListenerJob, ILatestListenerJob } from './types'
import { IContracts, ILatestListenerJob } from './types'
import { postprocessListeners } from './utils'

const FILE_NAME = import.meta.url
Expand Down Expand Up @@ -174,33 +174,26 @@ export class State {

const contractAddress = toAddListener.address
const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress)
const observedBlockMetadata = await getListenerObservedBlock({
blockKey: observedBlockRedisKey,
logger: this.logger
})
const latestBlock = await this.latestBlockNumber()
const observedBlock =
observedBlockMetadata.blockKey === '' ? latestBlock : observedBlockMetadata.blockNumber

/**
update observedBlock to latestBlock - 1 since all blocks in between will be handled
by history queue and worker
*/
await upsertListenerObservedBlock({
const { blockKey: observedBlockKey } = await getListenerObservedBlock({
blockKey: observedBlockRedisKey,
blockNumber: Math.max(latestBlock - 1, 0),
logger: this.logger
})
await this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0))

for (let blockNumber = observedBlock; blockNumber < latestBlock; ++blockNumber) {
const historyOutData: IHistoryListenerJob = {
contractAddress,
blockNumber
}
await this.historyListenerQueue.add('history', historyOutData, {
...LISTENER_JOB_SETTINGS
/**
when listener starts, there are two options:
* latest observedBlock (key) exists -> do nothing
(start latest job, it'll handle multiple blocks
between observedBlock and latestBlock)
* it does not exist -> upsert latestBlock
*/
if (observedBlockKey === '') {
await upsertListenerObservedBlock({
blockKey: observedBlockRedisKey,
blockNumber: Math.max(latestBlock - 1, 0),
logger: this.logger
})
await this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0))
}

// Insert listener jobs
Expand Down

0 comments on commit 5a70839

Please sign in to comment.