Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extract deal observer loop #42

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

NikolasHaimerl
Copy link
Contributor

This PR introduces the following changes:

  1. Extract the logic of the deal observer loop into its own file
  2. Add a test for the deal observer loop
  3. Extract the mocked rpc endpoint into its own file for easier reuse.

Copy link
Member

@bajtos bajtos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea to move the loop functions from bin/ to lib/!

backend/test/deal-observer.test.js Outdated Show resolved Hide resolved
backend/lib/loops.js Outdated Show resolved Hide resolved
@bajtos bajtos mentioned this pull request Jan 28, 2025
@NikolasHaimerl NikolasHaimerl requested a review from bajtos January 29, 2025 06:32
finalityEpochs,
LOOP_INTERVAL,
INFLUXDB_TOKEN,
signal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has a lot of arguments, working with positional arguments is hard in this case. Imagine using an editor that doesn't infer types, or if we later on change the arguments list.

What do you think about changing this to an options object?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use an options object (named parameters).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos isn't this in contrast to what you mentioned in an earlier PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the earlier PR @bajtos also suggested to use an options object / param object / named parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I must have misunderstood then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos isn't this in contrast to what you mentioned in an earlier PR?

It's great that you asked. As Julian pointed out, I meant the same thing in that other comment.

JavaScript language does not provide syntax for named parameters. We implement named parameters by passing around an object where each property corresponds to one parameter.

// definition
function doSomeStuff({ pgClient, signal }) {
  // ...
}

// example usage
const pgClient = /*..*/

doSomeStuff({
  pgClient,
  signal: AbortSignal.timeout(10_000)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TypeScript typings are slightly more complex - you need to describe the single positional parameter accepting an object, and then describe the properties of that object.

/**
 * @param {object} params
 * @param {pg.Client} params.pgClient
 * @param {AbortSignal} signal
 */
function doSomeStuff({ pgClient, signal }) {
  // ...
}

I personally tend to call the object argument as args, see e.g. here:

https://github.com/CheckerNetwork/piece-indexer/blob/765dbfe7705ce375f7b5a317f4912264e7a70ffa/indexer/lib/ipni-watcher.js#L11-L16

/**
 * @param {object} args
 * @param {number} args.minSyncIntervalInMs
 * @param {AbortSignal} [args.signal]
 */
export async function * runIpniSync ({ minSyncIntervalInMs, signal }) {
  // ...
}

The syntax [args.signal] tells the type-checker that signal is an optional parameter.

Copy link
Contributor Author

@NikolasHaimerl NikolasHaimerl Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I quite understand. How does
({minSyncIntervalInMs, signal}) differ from (minSyncIntervalInMs, signal) for readability. Or do you mean to have a single argument (args) and then call args.pgPool?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Depending on the calling situation, you might not have named arguments. So, the second one quickly becomes (1000, signal) vs the first one is always clear: ({ minSyncIntervalInMs: 1000, signal })
  2. Arguments order doesn't matter with an object, ({ a, b }) works just like ({ b, a })

There are probably more factors here, but these are the ones most important in my mind

import { chainHeadTestData } from './test_data/chainHead.js'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'

export const makeRpcRequest = async (method, params) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will scale well. @bajtos what do you think about this helper? One test might need a different test data set being returned than another. My suspicion is that it will scale better to define this makeRpcRequest function where the tests are also defined (and potentially doing it multiple times).

Copy link
Member

@bajtos bajtos Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share your concerns and agree it's best to define the stub makerRpcRequest function in every test.

We can implement a builder helper to make it easier to implement such stubs.

// usage in tests
const makeRpcRequest = buildMakeRpcRequest({
  'Filecoin.ChainHead': chainHeadTestData
})

// implementation
export const buildMakeRpcRequest = (methodsToResults) => {
  return async (method, _params) => {
    if (methodsToResults[method]) return methodsToResults[method]
    throw new Error(`Unsupported RPC API method: "${method}"`)
  }
}

Copy link
Contributor Author

@NikolasHaimerl NikolasHaimerl Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn"t this adding quite a bit of complexity for a simple problem?
We do not need this complexity right now, and it is also not foreseeable whether we will need it in the future. I would much rather add complexity when we need it instead of before.

Copy link
Member

@juliangruber juliangruber Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be less complex to inline this function (without the helper), than creating a new utility (i.e. a new abstraction)

finalityEpochs,
LOOP_INTERVAL,
INFLUXDB_TOKEN,
signal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use an options object (named parameters).

backend/test/deal-observer.test.js Outdated Show resolved Hide resolved
Comment on lines 89 to 91
after(async () => {
await pgPool.query('DELETE FROM active_deals')
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind not cleaning the DB after tests in this pull request and waiting until we resolve the discussion in #41?

backend/test/deal-observer.test.js Outdated Show resolved Hide resolved
Comment on lines 110 to 112
do {
rows = (await pgPool.query('SELECT * FROM active_deals')).rows
} while (rows.length !== 360)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick - feel free to ignore.

Fetching data for all rows is not a very efficient way to count the number of rows in a table. It does not matter in this test, since you are fetching up to 360 rows, but it could become a problem if used elsewhere.

A more efficient solution is to use the SQL aggregate function COUNT().

while (true) {
  const { rows } = await pgPool.query('SELECT COUNT(*) FROM active_deals')
  if (rows[0].count === 360) break
}

import { chainHeadTestData } from './test_data/chainHead.js'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'

export const makeRpcRequest = async (method, params) => {
Copy link
Member

@bajtos bajtos Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share your concerns and agree it's best to define the stub makerRpcRequest function in every test.

We can implement a builder helper to make it easier to implement such stubs.

// usage in tests
const makeRpcRequest = buildMakeRpcRequest({
  'Filecoin.ChainHead': chainHeadTestData
})

// implementation
export const buildMakeRpcRequest = (methodsToResults) => {
  return async (method, _params) => {
    if (methodsToResults[method]) return methodsToResults[method]
    throw new Error(`Unsupported RPC API method: "${method}"`)
  }
}

backend/test/utils.js Outdated Show resolved Hide resolved
* @param {string | undefined} influxToken
* @returns {Promise<void>}
* */
export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken, signal) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that extracting the loop logic to a separate function could be a good idea? I had something like this in mind:

// lib/loop.js
const loop = async ({ signal, name, interval, fn, recordTelemetry, captureException }) => {
  const submitDeals = submitDealsToSparkApi(sparkApiBaseURL, dealIngestionAccessToken)
  while (!signal?.aborted) {
    const start = Date.now()
    try {
      await fn()
    } catch (e) {
      console.error(e)
      captureException(e)
    }
    const dt = Date.now() - start
    console.log(`Loop "${name}" took ${dt}ms`)
    recordTelemetry(`loop_${slug(name, '_')}`, point => {
      point.intField('interval_ms', name)
      point.intField('duration_ms', dt)
    })

    if (dt < interval) {
      await timers.setTimeout(interval - dt)
    }
  }
}

// lib/deal-observer-loop.js
const dealObserverLoop = (pgPool, rpcClient, opts) => async () => {
  // logic goes here
}

// lib/deal-submitter-loop.js
const dealSubmitLoop = (pgPool, opts) => async () => {
  // logic goes here
}

// bin/deal-observer-backend.js
await Promise.all([
  loop({
    name: 'dealObserver',
    interval: 1000,
    fn: dealObserverLoop(pgPool, rpcClient, opts),
    recordTelemetry: console.log,
    captureException: console.error,
  }),
  loop({
    name: 'dealSubmit',
    interval: 1000,
    fn: dealSubmitLoop(pgPool, opts),
    recordTelemetry: console.log,
    captureException: console.error,
  })
])

I feel that something like this could help us to keep code more DRY, while also keeping the loop function itself clean as we don't have to pass so many arguments around (sentry, influx, and other dependencies).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a valid approach. Given that all loops follow the same overall structure outside of await fn()

Copy link
Contributor

@pyropy pyropy Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos @juliangruber I'd appreciate your input on this as I'd like to apply changes to #33 as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, #33 can be landed without the loop refactor (can be followed up with)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants