Skip to content

Commit

Permalink
rearrange execution sequence in state.add()
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed May 28, 2024
1 parent c7496e3 commit ce513da
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions core/src/listener/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,18 @@ export class State {
await this.redisClient.set(this.stateName, JSON.stringify(updatedActiveListeners))

const contractAddress = toAddListener.address
const contract = new ethers.Contract(contractAddress, this.abi, this.provider)
this.contracts = { ...this.contracts, [contractAddress]: contract }

const latestBlock = await this.latestBlockNumber()
const observedBlock = await getObservedBlock({ service: this.service })

// fetch all unprocessed blocks and pass to the history queue
const unprocessedBlocks = await getUnprocessedBlocks({ service: this.service })
for (const block of unprocessedBlocks) {
await this.addBlockToHistoryQueue(contractAddress, block.blockNumber)
}

// if there is no observedBlock record in the db,
// use the listenerInitType to determine how to initialize the listener
if (observedBlock.service === '') {
Expand All @@ -201,27 +210,21 @@ export class State {

default:
// [block number] initialization
for (let blockNumber = this.listenerInitType; blockNumber < latestBlock; ++blockNumber) {
for (let blockNumber = this.listenerInitType; blockNumber <= latestBlock; ++blockNumber) {
await this.addBlockToHistoryQueue(contractAddress, blockNumber)
}
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock - 1 })
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock })
break
}
} else {
for (
let blockNumber = observedBlock.blockNumber + 1;
blockNumber < latestBlock;
blockNumber <= latestBlock;
++blockNumber
) {
await this.addBlockToHistoryQueue(contractAddress, blockNumber)
}
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock - 1 })
}

// fetch all unprocessed blocks and pass to the history queue
const unprocessedBlocks = await getUnprocessedBlocks({ service: this.service })
for (const block of unprocessedBlocks) {
await this.addBlockToHistoryQueue(contractAddress, block.blockNumber)
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock })
}

// Insert listener jobs
Expand All @@ -236,9 +239,6 @@ export class State {
}
})

const contract = new ethers.Contract(contractAddress, this.abi, this.provider)
this.contracts = { ...this.contracts, [contractAddress]: contract }

return toAddListener
}

Expand Down

0 comments on commit ce513da

Please sign in to comment.