Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extract deal observer loop #42

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
50 changes: 12 additions & 38 deletions backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { createPgPool } from '@filecoin-station/deal-observer-db'
import * as Sentry from '@sentry/node'
import timers from 'node:timers/promises'
import slug from 'slug'
import '../lib/instrument.js'
import { createInflux } from '../lib/telemetry.js'
import { getChainHead, rpcRequest } from '../lib/rpc-service/service.js'
import { fetchDealWithHighestActivatedEpoch, observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { rpcRequest } from '../lib/rpc-service/service.js'
import assert from 'node:assert'
import { dealObserverLoop } from '../lib/deal-observer-loop.js'

const { INFLUXDB_TOKEN } = process.env
if (!INFLUXDB_TOKEN) {
Expand All @@ -19,42 +17,18 @@ const finalityEpochs = 940
const maxPastEpochs = 1999
assert(finalityEpochs <= maxPastEpochs)
const pgPool = await createPgPool()

const LOOP_NAME = 'Built-in actor events'
const { recordTelemetry } = createInflux(INFLUXDB_TOKEN)

const dealObserverLoop = async (makeRpcRequest, pgPool) => {
while (true) {
const start = Date.now()
try {
const currentChainHead = await getChainHead(makeRpcRequest)
const currentFinalizedChainHead = currentChainHead.Height - finalityEpochs
// If the storage is empty we start 2000 blocks into the past as that is the furthest we can go with the public glif rpc endpoints.
const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const lastEpochStored = lastInsertedDeal ? lastInsertedDeal.activated_at_epoch : currentChainHead.Height - maxPastEpochs
for (let epoch = lastEpochStored + 1; epoch <= currentFinalizedChainHead; epoch++) {
await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest)
}
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = Date.now() - start
console.log(`Loop "${LOOP_NAME}" took ${dt}ms`)

if (INFLUXDB_TOKEN) {
recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => {
point.intField('interval_ms', LOOP_INTERVAL)
point.intField('duration_ms', dt)
})
}
if (dt < LOOP_INTERVAL) {
await timers.setTimeout(LOOP_INTERVAL - dt)
}
}
}
const controller = new AbortController()
const { signal } = controller

await dealObserverLoop(
rpcRequest,
pgPool
pgPool,
recordTelemetry,
Sentry,
maxPastEpochs,
finalityEpochs,
LOOP_INTERVAL,
INFLUXDB_TOKEN,
signal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has a lot of arguments, working with positional arguments is hard in this case. Imagine using an editor that doesn't infer types, or if we later on change the arguments list.

What do you think about changing this to an options object?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use an options object (named parameters).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos isn't this in contrast to what you mentioned in an earlier PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the earlier PR @bajtos also suggested to use an options object / param object / named parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I must have misunderstood then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos isn't this in contrast to what you mentioned in an earlier PR?

It's great that you asked. As Julian pointed out, I meant the same thing in that other comment.

JavaScript language does not provide syntax for named parameters. We implement named parameters by passing around an object where each property corresponds to one parameter.

// definition
function doSomeStuff({ pgClient, signal }) {
  // ...
}

// example usage
const pgClient = /*..*/

doSomeStuff({
  pgClient,
  signal: AbortSignal.timeout(10_000)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TypeScript typings are slightly more complex - you need to describe the single positional parameter accepting an object, and then describe the properties of that object.

/**
 * @param {object} params
 * @param {pg.Client} params.pgClient
 * @param {AbortSignal} signal
 */
function doSomeStuff({ pgClient, signal }) {
  // ...
}

I personally tend to call the object argument as args, see e.g. here:

https://github.com/CheckerNetwork/piece-indexer/blob/765dbfe7705ce375f7b5a317f4912264e7a70ffa/indexer/lib/ipni-watcher.js#L11-L16

/**
 * @param {object} args
 * @param {number} args.minSyncIntervalInMs
 * @param {AbortSignal} [args.signal]
 */
export async function * runIpniSync ({ minSyncIntervalInMs, signal }) {
  // ...
}

The syntax [args.signal] tells the type-checker that signal is an optional parameter.

Copy link
Contributor Author

@NikolasHaimerl NikolasHaimerl Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I quite understand. How does
({minSyncIntervalInMs, signal}) differ from (minSyncIntervalInMs, signal) for readability. Or do you mean to have a single argument (args) and then call args.pgPool?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Depending on the calling situation, you might not have named arguments. So, the second one quickly becomes (1000, signal) vs the first one is always clear: ({ minSyncIntervalInMs: 1000, signal })
  2. Arguments order doesn't matter with an object, ({ a, b }) works just like ({ b, a })

There are probably more factors here, but these are the ones most important in my mind

)
50 changes: 50 additions & 0 deletions backend/lib/deal-observer-loop.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { fetchDealWithHighestActivatedEpoch, observeBuiltinActorEvents } from './deal-observer.js'
import { getChainHead } from './rpc-service/service.js'
import timers from 'node:timers/promises'
import slug from 'slug'
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import {Point} from '@influxdata/influxdb-client' */

/**
* @param {(method:string,params:object) => object} makeRpcRequest
* @param {Queryable} pgPool
* @param {(name: string, fn: (p: Point) => void) => void} recordTelemetry
* @param {import("@sentry/node")} Sentry
* @param {number} maxPastEpochs
* @param {number} finalityEpochs
* @param {number} loopInterval
* @param {string | undefined} influxToken
* @returns {Promise<void>}
* */
export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken, signal) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that extracting the loop logic to a separate function could be a good idea? I had something like this in mind:

// lib/loop.js
const loop = async ({ signal, name, interval, fn, recordTelemetry, captureException }) => {
  const submitDeals = submitDealsToSparkApi(sparkApiBaseURL, dealIngestionAccessToken)
  while (!signal?.aborted) {
    const start = Date.now()
    try {
      await fn()
    } catch (e) {
      console.error(e)
      captureException(e)
    }
    const dt = Date.now() - start
    console.log(`Loop "${name}" took ${dt}ms`)
    recordTelemetry(`loop_${slug(name, '_')}`, point => {
      point.intField('interval_ms', name)
      point.intField('duration_ms', dt)
    })

    if (dt < interval) {
      await timers.setTimeout(interval - dt)
    }
  }
}

// lib/deal-observer-loop.js
const dealObserverLoop = (pgPool, rpcClient, opts) => async () => {
  // logic goes here
}

// lib/deal-submitter-loop.js
const dealSubmitLoop = (pgPool, opts) => async () => {
  // logic goes here
}

// bin/deal-observer-backend.js
await Promise.all([
  loop({
    name: 'dealObserver',
    interval: 1000,
    fn: dealObserverLoop(pgPool, rpcClient, opts),
    recordTelemetry: console.log,
    captureException: console.error,
  }),
  loop({
    name: 'dealSubmit',
    interval: 1000,
    fn: dealSubmitLoop(pgPool, opts),
    recordTelemetry: console.log,
    captureException: console.error,
  })
])

I feel that something like this could help us to keep code more DRY, while also keeping the loop function itself clean as we don't have to pass so many arguments around (sentry, influx, and other dependencies).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a valid approach. Given that all loops follow the same overall structure outside of await fn()

Copy link
Contributor

@pyropy pyropy Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos @juliangruber I'd appreciate your input on this as I'd like to apply changes to #33 as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, #33 can be landed without the loop refactor (can be followed up with)

const LOOP_NAME = 'Built-in actor events'

while (!signal?.aborted) {
const start = Date.now()
try {
const currentChainHead = await getChainHead(makeRpcRequest)
const currentFinalizedChainHead = currentChainHead.Height - finalityEpochs
// If the storage is empty we start 2000 blocks into the past as that is the furthest we can go with the public glif rpc endpoints.
const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const lastEpochStored = lastInsertedDeal ? lastInsertedDeal.activated_at_epoch : currentChainHead.Height - maxPastEpochs
for (let epoch = lastEpochStored + 1; epoch <= currentFinalizedChainHead; epoch++) {
await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest)
}
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = Date.now() - start
console.log(`Loop "${LOOP_NAME}" took ${dt}ms`)

if (influxToken) {
recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => {
point.intField('interval_ms', loopInterval)
point.intField('duration_ms', dt)
})
}
if (dt < loopInterval) {
await timers.setTimeout(loopInterval - dt)
}
}
}
59 changes: 59 additions & 0 deletions backend/test/deal-observer-loop.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db'
import { dealObserverLoop } from '../lib/deal-observer-loop.js'
import { before, beforeEach, it, describe, after } from 'node:test'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { chainHeadTestData } from './test_data/chainHead.js'
import { parse } from '@ipld/dag-json'

describe('dealObserverLoop', () => {
let pgPool
const makeRpcRequest = async (method, params) => {
switch (method) {
case 'Filecoin.ChainHead':
return parse(JSON.stringify(chainHeadTestData))
case 'Filecoin.GetActorEventsRaw':
return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
default:
throw new Error(`Unsupported RPC API method: "${method}"`)
}
}
before(async () => {
pgPool = await createPgPool()
await migrateWithPgClient(pgPool)
})

after(async () => {
await pgPool.end()
})

beforeEach(async () => {
await pgPool.query('DELETE FROM active_deals')
})

it('then deal observer loop fetches new active deals and stores them in storage', async (t) => {
const controller = new AbortController()
const { signal } = controller
const waitForDealCount = async (targetCount) => {
while (true) {
const { rows } = (await pgPool.query('SELECT COUNT(*) FROM active_deals'))
console.log('Current deal count:', rows[0].count)
if (parseInt(rows[0].count) === targetCount) break
}
controller.abort()
}
const failOnTimeout = async () => { await setTimeout(() => { if (!signal.aborted) { throw new Error('Test timed out') } }, 2000) }
await Promise.all([failOnTimeout(), waitForDealCount(360),
dealObserverLoop(
makeRpcRequest,
pgPool,
undefined,
undefined,
// The testdata has a total amount of 11 blocks
11,
0,
100,
undefined,
signal
)])
})
})
2 changes: 1 addition & 1 deletion backend/test/deal-observer.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import assert from 'node:assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { after, before, beforeEach, it, describe } from 'node:test'
import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db'
import { fetchDealWithHighestActivatedEpoch, storeActiveDeals } from '../lib/deal-observer.js'
import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js'
Expand Down
4 changes: 2 additions & 2 deletions backend/test/rpc-client.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import assert from 'node:assert'
import { describe, it } from 'node:test'
import { chainHeadTestData } from './test_data/chainHead.js'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { parse } from '@ipld/dag-json'
import { getActorEvents, getActorEventsFilter, getChainHead, rpcRequest } from '../lib/rpc-service/service.js'
import { ClaimEvent } from '../lib/rpc-service/data-types.js'
import { Value } from '@sinclair/typebox/value'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'

describe('RpcApiClient', () => {
const makeRpcRequest = async (method, params) => {
Expand All @@ -15,7 +15,7 @@ describe('RpcApiClient', () => {
case 'Filecoin.GetActorEventsRaw':
return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
default:
console.error('Unknown method')
throw new Error(`Unsupported RPC API method: "${method}"`)
}
}

Expand Down